From e66bdcc9ad2069d7773de89cb10b999757782e8e Mon Sep 17 00:00:00 2001 From: Mihailo Aleksic Date: Wed, 23 Oct 2024 11:34:38 +0200 Subject: [PATCH 1/3] initial commit --- .../sql/catalyst/analysis/unresolved.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 6f445b1e88d70..e187d7a9e8ad0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -500,25 +500,34 @@ abstract class UnresolvedStarBase(target: Option[Seq[String]]) extends Star with override def expand( input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { + expandStar(input.output, input.metadataOutput, input.resolve, input.outputSet, resolver) + } + + def expandStar( + output: Seq[Attribute], + metadataOutput: Seq[Attribute], + resolve: (Seq[String], Resolver) => Option[NamedExpression], + inputSet: AttributeSet, + resolver: Resolver): Seq[NamedExpression] = { // If there is no table specified, use all non-hidden input attributes. - if (target.isEmpty) return input.output + if (target.isEmpty) return output // If there is a table specified, use hidden input attributes as well - val hiddenOutput = input.metadataOutput.filter(_.qualifiedAccessOnly) + val hiddenOutput = metadataOutput.filter(_.qualifiedAccessOnly) // Remove the qualified-access-only restriction immediately. The expanded attributes will be // put in a logical plan node and becomes normal attributes. They can still keep the special // attribute metadata to indicate that they are from metadata columns, but they should not // keep any restrictions that may break column resolution for normal attributes. // See SPARK-42084 for more details. .map(_.markAsAllowAnyAccess()) - val expandedAttributes = (hiddenOutput ++ input.output).filter( + val expandedAttributes = (hiddenOutput ++ output).filter( matchedQualifier(_, target.get, resolver)) if (expandedAttributes.nonEmpty) return expandedAttributes // Try to resolve it as a struct expansion. If there is a conflict and both are possible, // (i.e. [name].* is both a table and a struct), the struct path can always be qualified. - val attribute = input.resolve(target.get, resolver) + val attribute = resolve(target.get, resolver) if (attribute.isDefined) { // This target resolved to an attribute in child. It must be a struct. Expand it. attribute.get.dataType match { @@ -532,7 +541,7 @@ abstract class UnresolvedStarBase(target: Option[Seq[String]]) extends Star with throw QueryCompilationErrors.starExpandDataTypeNotSupportedError(target.get) } } else { - val from = input.inputSet.map(_.name).map(toSQLId).mkString(", ") + val from = inputSet.map(_.name).map(toSQLId).mkString(", ") val targetString = target.get.mkString(".") throw QueryCompilationErrors.cannotResolveStarExpandGivenInputColumnsError( targetString, from) From 6baaee9601dba5a3aa92ebd2c32a55aae24b101d Mon Sep 17 00:00:00 2001 From: Mihailo Aleksic Date: Wed, 23 Oct 2024 12:53:32 +0200 Subject: [PATCH 2/3] refactor --- .../sql/catalyst/analysis/unresolved.scala | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index e187d7a9e8ad0..7b15e1f84399e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -500,27 +500,37 @@ abstract class UnresolvedStarBase(target: Option[Seq[String]]) extends Star with override def expand( input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { - expandStar(input.output, input.metadataOutput, input.resolve, input.outputSet, resolver) + expandStar(input.output, input.metadataOutput, input.resolve, input.inputSet.toSeq, resolver) } + /** + * Method used to expand a star. It uses output and metadata output attributes of the child + * for the expansion and it supports both recursive and non-recursive data types. + * + * @param childOperatorOutput the output attributes of the child operator + * @param childOperatorMetadataOutput the metadata output attributes of the child operator + * @param resolve a function to resolve the given name parts to an attribute + * @param suggestedAttributes a list of attributes that are suggested for expansion + * @param resolver the resolver used to match the name parts + */ def expandStar( - output: Seq[Attribute], - metadataOutput: Seq[Attribute], + childOperatorOutput: Seq[Attribute], + childOperatorMetadataOutput: Seq[Attribute], resolve: (Seq[String], Resolver) => Option[NamedExpression], - inputSet: AttributeSet, + suggestedAttributes: Seq[Attribute], resolver: Resolver): Seq[NamedExpression] = { // If there is no table specified, use all non-hidden input attributes. - if (target.isEmpty) return output + if (target.isEmpty) return childOperatorOutput // If there is a table specified, use hidden input attributes as well - val hiddenOutput = metadataOutput.filter(_.qualifiedAccessOnly) + val hiddenOutput = childOperatorMetadataOutput.filter(_.qualifiedAccessOnly) // Remove the qualified-access-only restriction immediately. The expanded attributes will be // put in a logical plan node and becomes normal attributes. They can still keep the special // attribute metadata to indicate that they are from metadata columns, but they should not // keep any restrictions that may break column resolution for normal attributes. // See SPARK-42084 for more details. .map(_.markAsAllowAnyAccess()) - val expandedAttributes = (hiddenOutput ++ output).filter( + val expandedAttributes = (hiddenOutput ++ childOperatorOutput).filter( matchedQualifier(_, target.get, resolver)) if (expandedAttributes.nonEmpty) return expandedAttributes @@ -541,7 +551,7 @@ abstract class UnresolvedStarBase(target: Option[Seq[String]]) extends Star with throw QueryCompilationErrors.starExpandDataTypeNotSupportedError(target.get) } } else { - val from = inputSet.map(_.name).map(toSQLId).mkString(", ") + val from = suggestedAttributes.map(_.name).map(toSQLId).mkString(", ") val targetString = target.get.mkString(".") throw QueryCompilationErrors.cannotResolveStarExpandGivenInputColumnsError( targetString, from) From cef11c57ecbb5297cc86f9326ca207082a2bf1a9 Mon Sep 17 00:00:00 2001 From: Mihailo Aleksic Date: Wed, 23 Oct 2024 14:34:50 +0200 Subject: [PATCH 3/3] nits --- .../spark/sql/catalyst/analysis/unresolved.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 7b15e1f84399e..389e939bd8e2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -507,11 +507,11 @@ abstract class UnresolvedStarBase(target: Option[Seq[String]]) extends Star with * Method used to expand a star. It uses output and metadata output attributes of the child * for the expansion and it supports both recursive and non-recursive data types. * - * @param childOperatorOutput the output attributes of the child operator - * @param childOperatorMetadataOutput the metadata output attributes of the child operator - * @param resolve a function to resolve the given name parts to an attribute - * @param suggestedAttributes a list of attributes that are suggested for expansion - * @param resolver the resolver used to match the name parts + * @param childOperatorOutput The output attributes of the child operator + * @param childOperatorMetadataOutput The metadata output attributes of the child operator + * @param resolve A function to resolve the given name parts to an attribute + * @param suggestedAttributes A list of attributes that are suggested for expansion + * @param resolver The resolver used to match the name parts */ def expandStar( childOperatorOutput: Seq[Attribute],