Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -500,25 +500,44 @@ abstract class UnresolvedStarBase(target: Option[Seq[String]]) extends Star with
override def expand(
input: LogicalPlan,
resolver: Resolver): Seq[NamedExpression] = {
expandStar(input.output, input.metadataOutput, input.resolve, input.inputSet.toSeq, resolver)
}

/**
* Method used to expand a star. It uses output and metadata output attributes of the child
* for the expansion and it supports both recursive and non-recursive data types.
*
* @param childOperatorOutput The output attributes of the child operator
* @param childOperatorMetadataOutput The metadata output attributes of the child operator
* @param resolve A function to resolve the given name parts to an attribute
* @param suggestedAttributes A list of attributes that are suggested for expansion
* @param resolver The resolver used to match the name parts
*/
def expandStar(
childOperatorOutput: Seq[Attribute],
childOperatorMetadataOutput: Seq[Attribute],
resolve: (Seq[String], Resolver) => Option[NamedExpression],
suggestedAttributes: Seq[Attribute],
resolver: Resolver): Seq[NamedExpression] = {
// If there is no table specified, use all non-hidden input attributes.
if (target.isEmpty) return input.output
if (target.isEmpty) return childOperatorOutput

// If there is a table specified, use hidden input attributes as well
val hiddenOutput = input.metadataOutput.filter(_.qualifiedAccessOnly)
val hiddenOutput = childOperatorMetadataOutput.filter(_.qualifiedAccessOnly)
// Remove the qualified-access-only restriction immediately. The expanded attributes will be
// put in a logical plan node and becomes normal attributes. They can still keep the special
// attribute metadata to indicate that they are from metadata columns, but they should not
// keep any restrictions that may break column resolution for normal attributes.
// See SPARK-42084 for more details.
.map(_.markAsAllowAnyAccess())
val expandedAttributes = (hiddenOutput ++ input.output).filter(
val expandedAttributes = (hiddenOutput ++ childOperatorOutput).filter(
matchedQualifier(_, target.get, resolver))

if (expandedAttributes.nonEmpty) return expandedAttributes

// Try to resolve it as a struct expansion. If there is a conflict and both are possible,
// (i.e. [name].* is both a table and a struct), the struct path can always be qualified.
val attribute = input.resolve(target.get, resolver)
val attribute = resolve(target.get, resolver)
if (attribute.isDefined) {
// This target resolved to an attribute in child. It must be a struct. Expand it.
attribute.get.dataType match {
Expand All @@ -532,7 +551,7 @@ abstract class UnresolvedStarBase(target: Option[Seq[String]]) extends Star with
throw QueryCompilationErrors.starExpandDataTypeNotSupportedError(target.get)
}
} else {
val from = input.inputSet.map(_.name).map(toSQLId).mkString(", ")
val from = suggestedAttributes.map(_.name).map(toSQLId).mkString(", ")
val targetString = target.get.mkString(".")
throw QueryCompilationErrors.cannotResolveStarExpandGivenInputColumnsError(
targetString, from)
Expand Down