Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1958,15 +1958,7 @@ class Analyzer(
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case j @ Join(left, right, UsingJoin(joinType, usingCols), condition)
if left.resolved && right.resolved && j.duplicateResolved =>
// Resolve the column names referenced in using clause from both the legs of join.
val lCols = usingCols.flatMap(col => left.resolveQuoted(col.name, resolver))
val rCols = usingCols.flatMap(col => right.resolveQuoted(col.name, resolver))
if ((lCols.length == usingCols.length) && (rCols.length == usingCols.length)) {
val joinNames = lCols.map(exp => exp.name)
commonNaturalJoinProcessing(left, right, joinType, joinNames, None)
} else {
j
}
commonNaturalJoinProcessing(left, right, joinType, usingCols, None)
case j @ Join(left, right, NaturalJoin(joinType), condition) if j.resolvedExceptNatural =>
// find common column names from both sides
val joinNames = left.output.map(_.name).intersect(right.output.map(_.name))
Expand All @@ -1981,18 +1973,16 @@ class Analyzer(
joinNames: Seq[String],
condition: Option[Expression]) = {
val leftKeys = joinNames.map { keyName =>
val joinColumn = left.output.find(attr => resolver(attr.name, keyName))
assert(
joinColumn.isDefined,
s"$keyName should exist in ${left.output.map(_.name).mkString(",")}")
joinColumn.get
left.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw new AnalysisException(s"USING column `$keyName` can not be resolved with the " +
s"left join side, the left output is: [${left.output.map(_.name).mkString(", ")}]")
}
}
val rightKeys = joinNames.map { keyName =>
val joinColumn = right.output.find(attr => resolver(attr.name, keyName))
assert(
joinColumn.isDefined,
s"$keyName should exist in ${right.output.map(_.name).mkString(",")}")
joinColumn.get
right.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw new AnalysisException(s"USING column `$keyName` can not be resolved with the " +
s"right join side, the right output is: [${right.output.map(_.name).mkString(", ")}]")
}
}
val joinPairs = leftKeys.zip(rightKeys)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,6 @@ trait CheckAnalysis extends PredicateHelper {
case e =>
}

case j @ Join(_, _, UsingJoin(_, cols), _) =>
val from = operator.inputSet.map(_.name).mkString(", ")
failAnalysis(
s"using columns [${cols.mkString(",")}] " +
s"can not be resolved given input columns: [$from] ")

case j @ Join(_, _, _, Some(condition)) if condition.dataType != BooleanType =>
failAnalysis(
s"join condition '${condition.sql}' " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
// Resolve the join type and join condition
val (joinType, condition) = Option(join.joinCriteria) match {
case Some(c) if c.USING != null =>
val columns = c.identifier.asScala.map { column =>
UnresolvedAttribute.quoted(column.getText)
}
(UsingJoin(baseJoinType, columns), None)
(UsingJoin(baseJoinType, c.identifier.asScala.map(_.getText)), None)
case Some(c) if c.booleanExpression != null =>
(baseJoinType, Option(expression(c.booleanExpression)))
case None if join.NATURAL != null =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ case class NaturalJoin(tpe: JoinType) extends JoinType {
override def sql: String = "NATURAL " + tpe.sql
}

case class UsingJoin(tpe: JoinType, usingColumns: Seq[UnresolvedAttribute]) extends JoinType {
case class UsingJoin(tpe: JoinType, usingColumns: Seq[String]) extends JoinType {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the USING column can never has a qualifier, or be a nested field, we don't need to use UnresolvedAttribute here.

Copy link
Member

@gatorsmile gatorsmile Dec 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we do not support the nested field. It also fails with your newly changed error.

    sql("CREATE TABLE complexTypeTable (s struct<i: string>)")
    val df = table("complexTypeTable")
    df.as("b").join(df.as("a"), "s.i").show()

Could you add the test case for it? Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submitted a follow-up PR #16110 for the test case of nested fields. When we implementing using join, we did not add any test case for nested fields. Thus, it was not covered before.

require(Seq(Inner, LeftOuter, LeftSemi, RightOuter, FullOuter, LeftAnti).contains(tpe),
"Unsupported using join type " + tpe)
override def sql: String = "USING " + tpe.sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,31 @@ class ResolveNaturalJoinSuite extends AnalysisTest {

test("natural/using inner join") {
val naturalPlan = r1.join(r2, NaturalJoin(Inner), None)
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None)
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None)
val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c)
checkAnalysis(naturalPlan, expected)
checkAnalysis(usingPlan, expected)
}

test("natural/using left join") {
val naturalPlan = r1.join(r2, NaturalJoin(LeftOuter), None)
val usingPlan = r1.join(r2, UsingJoin(LeftOuter, Seq(UnresolvedAttribute("a"))), None)
val usingPlan = r1.join(r2, UsingJoin(LeftOuter, Seq("a")), None)
val expected = r1.join(r2, LeftOuter, Some(EqualTo(a, a))).select(a, b, c)
checkAnalysis(naturalPlan, expected)
checkAnalysis(usingPlan, expected)
}

test("natural/using right join") {
val naturalPlan = r1.join(r2, NaturalJoin(RightOuter), None)
val usingPlan = r1.join(r2, UsingJoin(RightOuter, Seq(UnresolvedAttribute("a"))), None)
val usingPlan = r1.join(r2, UsingJoin(RightOuter, Seq("a")), None)
val expected = r1.join(r2, RightOuter, Some(EqualTo(a, a))).select(a, b, c)
checkAnalysis(naturalPlan, expected)
checkAnalysis(usingPlan, expected)
}

test("natural/using full outer join") {
val naturalPlan = r1.join(r2, NaturalJoin(FullOuter), None)
val usingPlan = r1.join(r2, UsingJoin(FullOuter, Seq(UnresolvedAttribute("a"))), None)
val usingPlan = r1.join(r2, UsingJoin(FullOuter, Seq("a")), None)
val expected = r1.join(r2, FullOuter, Some(EqualTo(a, a))).select(
Alias(Coalesce(Seq(a, a)), "a")(), b, c)
checkAnalysis(naturalPlan, expected)
Expand All @@ -71,7 +71,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {

test("natural/using inner join with no nullability") {
val naturalPlan = r3.join(r4, NaturalJoin(Inner), None)
val usingPlan = r3.join(r4, UsingJoin(Inner, Seq(UnresolvedAttribute("b"))), None)
val usingPlan = r3.join(r4, UsingJoin(Inner, Seq("b")), None)
val expected = r3.join(r4, Inner, Some(EqualTo(bNotNull, bNotNull))).select(
bNotNull, aNotNull, cNotNull)
checkAnalysis(naturalPlan, expected)
Expand All @@ -80,7 +80,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {

test("natural/using left join with no nullability") {
val naturalPlan = r3.join(r4, NaturalJoin(LeftOuter), None)
val usingPlan = r3.join(r4, UsingJoin(LeftOuter, Seq(UnresolvedAttribute("b"))), None)
val usingPlan = r3.join(r4, UsingJoin(LeftOuter, Seq("b")), None)
val expected = r3.join(r4, LeftOuter, Some(EqualTo(bNotNull, bNotNull))).select(
bNotNull, aNotNull, c)
checkAnalysis(naturalPlan, expected)
Expand All @@ -89,7 +89,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {

test("natural/using right join with no nullability") {
val naturalPlan = r3.join(r4, NaturalJoin(RightOuter), None)
val usingPlan = r3.join(r4, UsingJoin(RightOuter, Seq(UnresolvedAttribute("b"))), None)
val usingPlan = r3.join(r4, UsingJoin(RightOuter, Seq("b")), None)
val expected = r3.join(r4, RightOuter, Some(EqualTo(bNotNull, bNotNull))).select(
bNotNull, a, cNotNull)
checkAnalysis(naturalPlan, expected)
Expand All @@ -98,48 +98,43 @@ class ResolveNaturalJoinSuite extends AnalysisTest {

test("natural/using full outer join with no nullability") {
val naturalPlan = r3.join(r4, NaturalJoin(FullOuter), None)
val usingPlan = r3.join(r4, UsingJoin(FullOuter, Seq(UnresolvedAttribute("b"))), None)
val usingPlan = r3.join(r4, UsingJoin(FullOuter, Seq("b")), None)
val expected = r3.join(r4, FullOuter, Some(EqualTo(bNotNull, bNotNull))).select(
Alias(Coalesce(Seq(b, b)), "b")(), a, c)
checkAnalysis(naturalPlan, expected)
checkAnalysis(usingPlan, expected)
}

test("using unresolved attribute") {
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("d"))), None)
val error = intercept[AnalysisException] {
SimpleAnalyzer.checkAnalysis(usingPlan)
}
assert(error.message.contains(
"using columns ['d] can not be resolved given input columns: [b, a, c]"))
assertAnalysisError(
r1.join(r2, UsingJoin(Inner, Seq("d"))),
"USING column `d` can not be resolved with the left join side" :: Nil)
assertAnalysisError(
r1.join(r2, UsingJoin(Inner, Seq("b"))),
"USING column `b` can not be resolved with the right join side" :: Nil)
}

test("using join with a case sensitive analyzer") {
val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c)

{
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None)
checkAnalysis(usingPlan, expected, caseSensitive = true)
}
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None)
checkAnalysis(usingPlan, expected, caseSensitive = true)

{
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("A"))), None)
assertAnalysisError(
usingPlan,
Seq("using columns ['A] can not be resolved given input columns: [b, a, c, a]"))
}
assertAnalysisError(
r1.join(r2, UsingJoin(Inner, Seq("A"))),
"USING column `A` can not be resolved with the left join side" :: Nil)
}

test("using join with a case insensitive analyzer") {
val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c)

{
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None)
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None)
checkAnalysis(usingPlan, expected, caseSensitive = false)
}

{
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("A"))), None)
val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("A")), None)
checkAnalysis(usingPlan, expected, caseSensitive = false)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class PlanParserSuite extends PlanTest {
val testUsingJoin = (sql: String, jt: JoinType) => {
assertEqual(
s"select * from t $sql u using(a, b)",
table("t").join(table("u"), UsingJoin(jt, Seq('a.attr, 'b.attr)), None).select(star()))
table("t").join(table("u"), UsingJoin(jt, Seq("a", "b")), None).select(star()))
}
val testAll = Seq(testUnconditionalJoin, testConditionalJoin, testNaturalJoin, testUsingJoin)
val testExistence = Seq(testUnconditionalJoin, testConditionalJoin, testUsingJoin)
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ class Dataset[T] private[sql](
Join(
joined.left,
joined.right,
UsingJoin(JoinType(joinType), usingColumns.map(UnresolvedAttribute(_))),
UsingJoin(JoinType(joinType), usingColumns),
None)
}
}
Expand Down