Skip to content

Commit 6360723

Browse files
committed
Made PreInsertionCasts support SparkLogicalPlan and InMemoryColumnarTableScan
1 parent c9b0f6f commit 6360723

File tree

1 file changed

+26
-16
lines changed

1 file changed

+26
-16
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.plans.logical
3333
import org.apache.spark.sql.catalyst.plans.logical._
3434
import org.apache.spark.sql.catalyst.rules._
3535
import org.apache.spark.sql.catalyst.types._
36+
import org.apache.spark.sql.execution.SparkLogicalPlan
37+
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
3638

3739
/* Implicit conversions */
3840
import scala.collection.JavaConversions._
@@ -115,23 +117,31 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
115117
case p: LogicalPlan if !p.childrenResolved => p
116118

117119
case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) =>
118-
val childOutputDataTypes = child.output.map(_.dataType)
119-
// Only check attributes, not partitionKeys since they are always strings.
120-
// TODO: Fully support inserting into partitioned tables.
121-
val tableOutputDataTypes = table.attributes.map(_.dataType)
122-
123-
if (childOutputDataTypes == tableOutputDataTypes) {
124-
p
125-
} else {
126-
// Only do the casting when child output data types differ from table output data types.
127-
val castedChildOutput = child.output.zip(table.output).map {
128-
case (input, output) if input.dataType != output.dataType =>
129-
Alias(Cast(input, output.dataType), input.name)()
130-
case (input, _) => input
131-
}
132-
133-
p.copy(child = logical.Project(castedChildOutput, child))
120+
castChildOutput(p, table, child)
121+
122+
case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
123+
_, HiveTableScan(_, table, _))), _, child, _) =>
124+
castChildOutput(p, table, child)
125+
}
126+
127+
def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan) = {
128+
val childOutputDataTypes = child.output.map(_.dataType)
129+
// Only check attributes, not partitionKeys since they are always strings.
130+
// TODO: Fully support inserting into partitioned tables.
131+
val tableOutputDataTypes = table.attributes.map(_.dataType)
132+
133+
if (childOutputDataTypes == tableOutputDataTypes) {
134+
p
135+
} else {
136+
// Only do the casting when child output data types differ from table output data types.
137+
val castedChildOutput = child.output.zip(table.output).map {
138+
case (input, output) if input.dataType != output.dataType =>
139+
Alias(Cast(input, output.dataType), input.name)()
140+
case (input, _) => input
134141
}
142+
143+
p.copy(child = logical.Project(castedChildOutput, child))
144+
}
135145
}
136146
}
137147

0 commit comments

Comments
 (0)