Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>
val newQuery = prepareQuery(write, write.query)
val attrMap = AttributeMap(write.query.output.zip(newQuery.output))
val newChild = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,
val writeFiles = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,
write.bucketSpec, write.options, write.staticPartitions)
val newChild = writeFiles.transformExpressions {
case a: Attribute if attrMap.contains(a) =>
a.withExprId(attrMap(a).exprId)
}
val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions {
case a: Attribute if attrMap.contains(a) =>
a.withExprId(attrMap(a).exprId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ case class WriteFiles(
options: Map[String, String],
staticPartitions: TablePartitionSpec) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override protected def stringArgs: Iterator[Any] = Iterator(child)
override protected def withNewChildInternal(newChild: LogicalPlan): WriteFiles =
copy(child = newChild)
}
Expand Down
5 changes: 5 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/explain.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ CREATE table explain_temp1 (key int, val int) USING PARQUET;
CREATE table explain_temp2 (key int, val int) USING PARQUET;
CREATE table explain_temp3 (key int, val int) USING PARQUET;
CREATE table explain_temp4 (key int, val string) USING PARQUET;
CREATE table explain_temp5 (key int) USING PARQUET PARTITIONED BY(val string);

SET spark.sql.codegen.wholeStage = true;

Expand Down Expand Up @@ -119,11 +120,15 @@ EXPLAIN FORMATTED
FROM explain_temp4
GROUP BY key;

-- V1 Write
EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4;

-- cleanup
DROP TABLE explain_temp1;
DROP TABLE explain_temp2;
DROP TABLE explain_temp3;
DROP TABLE explain_temp4;
DROP TABLE explain_temp5;

-- SPARK-35479: Format PartitionFilters IN strings in scan nodes
CREATE table t(v array<string>) USING PARQUET;
Expand Down
48 changes: 48 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ struct<>



-- !query
CREATE table explain_temp5 (key int) USING PARQUET PARTITIONED BY(val string)
-- !query schema
struct<>
-- !query output



-- !query
SET spark.sql.codegen.wholeStage = true
-- !query schema
Expand Down Expand Up @@ -1067,6 +1075,38 @@ Output [2]: [key#x, min(val)#x]
Arguments: isFinalPlan=false


-- !query
EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
-- !query schema
struct<plan:string>
-- !query output
== Parsed Logical Plan ==
'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false
+- 'Project [*]
+- 'UnresolvedRelation [explain_temp4], [], false

== Analyzed Logical Plan ==
InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val]
+- Project [key#x, val#x]
+- SubqueryAlias spark_catalog.default.explain_temp4
+- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet

== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val]
+- WriteFiles
+- Sort [val#x ASC NULLS FIRST], false
+- Project [key#x, empty2null(val#x) AS val#x]
+- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet

== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val]
+- WriteFiles
+- *Sort [val#x ASC NULLS FIRST], false, 0
+- *Project [key#x, empty2null(val#x) AS val#x]
+- *ColumnarToRow
+- FileScan parquet spark_catalog.default.explain_temp4[key#x,val#x] Batched: true, DataFilters: [], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/explain_temp4], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,val:string>


-- !query
DROP TABLE explain_temp1
-- !query schema
Expand Down Expand Up @@ -1099,6 +1139,14 @@ struct<>



-- !query
DROP TABLE explain_temp5
-- !query schema
struct<>
-- !query output



-- !query
CREATE table t(v array<string>) USING PARQUET
-- !query schema
Expand Down
48 changes: 48 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/explain.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ struct<>



-- !query
CREATE table explain_temp5 (key int) USING PARQUET PARTITIONED BY(val string)
-- !query schema
struct<>
-- !query output



-- !query
SET spark.sql.codegen.wholeStage = true
-- !query schema
Expand Down Expand Up @@ -1009,6 +1017,38 @@ Aggregate Attributes [1]: [min(val#x)#x]
Results [2]: [key#x, min(val#x)#x AS min(val)#x]


-- !query
EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
-- !query schema
struct<plan:string>
-- !query output
== Parsed Logical Plan ==
'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false
+- 'Project [*]
+- 'UnresolvedRelation [explain_temp4], [], false

== Analyzed Logical Plan ==
InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val]
+- Project [key#x, val#x]
+- SubqueryAlias spark_catalog.default.explain_temp4
+- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet

== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val]
+- WriteFiles
+- Sort [val#x ASC NULLS FIRST], false
+- Project [key#x, empty2null(val#x) AS val#x]
+- Relation spark_catalog.default.explain_temp4[key#x,val#x] parquet

== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand Location [not included in comparison]/{warehouse_dir}/explain_temp5], Append, `spark_catalog`.`default`.`explain_temp5`, org.apache.spark.sql.execution.datasources.CatalogFileIndex@7d811218, [key, val]
+- WriteFiles
+- *Sort [val#x ASC NULLS FIRST], false, 0
+- *Project [key#x, empty2null(val#x) AS val#x]
+- *ColumnarToRow
+- FileScan parquet spark_catalog.default.explain_temp4[key#x,val#x] Batched: true, DataFilters: [], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/explain_temp4], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,val:string>


-- !query
DROP TABLE explain_temp1
-- !query schema
Expand Down Expand Up @@ -1041,6 +1081,14 @@ struct<>



-- !query
DROP TABLE explain_temp5
-- !query schema
struct<>
-- !query output



-- !query
CREATE table t(v array<string>) USING PARQUET
-- !query schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ trait SQLQueryTestHelper {
.replaceAll(
s"Location.*$clsName/",
s"Location $notIncludedMsg/{warehouse_dir}/")
.replaceAll(s"file:.*$clsName", s"Location $notIncludedMsg/{warehouse_dir}")
.replaceAll("Created By.*", s"Created By $notIncludedMsg")
.replaceAll("Created Time.*", s"Created Time $notIncludedMsg")
.replaceAll("Last Access.*", s"Last Access $notIncludedMsg")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils {
case w: V1WriteCommand =>
if (hasLogicalSort && conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) {
assert(w.query.isInstanceOf[WriteFiles])
assert(w.partitionColumns == w.query.asInstanceOf[WriteFiles].partitionColumns)
optimizedPlan = w.query.asInstanceOf[WriteFiles].child
} else {
optimizedPlan = w.query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.plan.TableDesc

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
Expand Down Expand Up @@ -286,7 +285,7 @@ case class InsertIntoHiveTable(
copy(query = newChild)
}

object InsertIntoHiveTable extends V1WritesHiveUtils with Logging {
object InsertIntoHiveTable extends V1WritesHiveUtils {
def apply(
table: CatalogTable,
partition: Map[String, Option[String]],
Expand Down