From 099ac9f3b08a7b592449a7a31fc31152c6200343 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Wed, 1 Feb 2023 10:25:03 +0800 Subject: [PATCH] Improve v1 writes with empty2null --- .../sql/execution/datasources/FileFormatWriter.scala | 9 ++------- .../spark/sql/execution/datasources/V1Writes.scala | 10 ++-------- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 2491c9d7754e0..8321b1fac71ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -206,13 +206,8 @@ object FileFormatWriter extends Logging { partitionColumns: Seq[Attribute], sortColumns: Seq[Attribute], orderingMatched: Boolean): Set[String] = { - val hasEmpty2Null = plan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions)) - val empty2NullPlan = if (hasEmpty2Null) { - plan - } else { - val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns) - if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan - } + val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns) + val empty2NullPlan = if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan writeAndCommit(job, description, committer) { val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index b17d72b0f7207..b1d2588ede627 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -93,13 +93,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { } private def prepareQuery(write: V1WriteCommand, query: LogicalPlan): LogicalPlan = { - val hasEmpty2Null = query.exists(p => hasEmptyToNull(p.expressions)) - val empty2NullPlan = if (hasEmpty2Null) { - query - } else { - val projectList = convertEmptyToNull(query.output, write.partitionColumns) - if (projectList.isEmpty) query else Project(projectList, query) - } + val projectList = convertEmptyToNull(query.output, write.partitionColumns) + val empty2NullPlan = if (projectList.isEmpty) query else Project(projectList, query) assert(empty2NullPlan.output.length == query.output.length) val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output)) @@ -108,7 +103,6 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { case a: Attribute => attrMap.getOrElse(a, a) }.asInstanceOf[SortOrder]) val outputOrdering = query.outputOrdering - // Check if the ordering is already matched to ensure the idempotency of the rule. val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering) if (orderingMatched) { empty2NullPlan