-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-40354][SQL] Support eliminate dynamic partition for datasource v1 writes #37831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,11 +17,15 @@ | |
|
|
||
| package org.apache.spark.sql.execution.datasources | ||
|
|
||
| import scala.annotation.tailrec | ||
| import scala.collection.mutable | ||
|
|
||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.SQLConfHelper | ||
| import org.apache.spark.sql.catalyst.catalog.BucketSpec | ||
| import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder, String2StringExpression, UnaryExpression} | ||
| import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogUtils} | ||
| import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Cast, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder, String2StringExpression, UnaryExpression} | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Filter, GlobalLimit, LocalLimit, LogicalPlan, Project, RebalancePartitions, RepartitionOperation, Sort} | ||
| import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.execution.command.DataWritingCommand | ||
|
|
@@ -30,6 +34,16 @@ import org.apache.spark.sql.types.StringType | |
| import org.apache.spark.unsafe.types.UTF8String | ||
|
|
||
| trait V1WriteCommand extends DataWritingCommand { | ||
| /** | ||
| * Specify the static partitions of the V1 write command. | ||
| */ | ||
| def staticPartitions: Map[String, String] | ||
|
|
||
| /** | ||
| * The number of static partition columns in `partitionColumns`. | ||
| * Note that, the static partition must be ahead of partition columns. | ||
| */ | ||
| final def numStaticPartitionCols: Int = staticPartitions.size | ||
|
|
||
| /** | ||
| * Specify the partition columns of the V1 write command. | ||
|
|
@@ -41,6 +55,11 @@ trait V1WriteCommand extends DataWritingCommand { | |
| * add SortExec if necessary when the requiredOrdering is empty. | ||
| */ | ||
| def requiredOrdering: Seq[SortOrder] | ||
|
|
||
| /** | ||
| * Replace the static partition spec for the V1 write command. | ||
| */ | ||
| def withNewStaticPartitionSpec(partitionSpec: Map[String, String]): V1WriteCommand | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -102,6 +121,83 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * This rule is used to eliminate dynamic partition to static partition for v1 writes if the | ||
| * partition columns is foldable, so that we can avoid unnecessary sort for dynamic partition. | ||
| * | ||
| * For example, a pure SQL: | ||
| * {{{ | ||
| * INSERT INTO TABLE t1 PARTITION(p) SELECT c, 'a' as p FROM t2 | ||
| * => | ||
| * INSERT INTO TABLE t1 PARTITION(p='a') SELECT c FROM t2 | ||
| * }}} | ||
| */ | ||
| object EliminateV1DynamicPartitionWrites extends Rule[LogicalPlan] { | ||
|
|
||
| @tailrec | ||
| private def queryOutput(p: LogicalPlan): Seq[NamedExpression] = p match { | ||
| case p: Project => p.projectList | ||
| case f: Filter => queryOutput(f.child) | ||
| case r: RepartitionOperation => queryOutput(r.child) | ||
| case r: RebalancePartitions => queryOutput(r.child) | ||
| case s: Sort => queryOutput(s.child) | ||
| case l: LocalLimit => queryOutput(l.child) | ||
| case l: GlobalLimit => queryOutput(l.child) | ||
| case _ => Seq.empty | ||
| } | ||
|
|
||
| private def getPartitionSpecString(part: Any): String = { | ||
| if (part == null) { | ||
| null | ||
| } else { | ||
| assert(part.isInstanceOf[UTF8String]) | ||
| ExternalCatalogUtils.getPartitionSpecString(part.asInstanceOf[UTF8String].toString) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. null and empty partition are treated as null, and the final partition path will be |
||
| } | ||
| } | ||
|
|
||
| private def tryEvalStaticPartition(named: NamedExpression): Option[(String, String)] = { | ||
| named match { | ||
| case Alias(l: Literal, name) => | ||
| Some((name, getPartitionSpecString( | ||
| Cast(l, StringType, Option(conf.sessionLocalTimeZone)).eval()))) | ||
| case _ => None | ||
| } | ||
| } | ||
|
|
||
| override def apply(plan: LogicalPlan): LogicalPlan = { | ||
| if (!conf.eliminateDynamicPartitionWrites) { | ||
| return plan | ||
| } | ||
|
|
||
| val resolver = SparkSession.active.sessionState.analyzer.resolver | ||
| plan.transformDown { | ||
| case v1Writes: V1WriteCommand => | ||
| val output = queryOutput(v1Writes.query) | ||
|
|
||
| // We can not infer a static partition which after a dynamic partition column, | ||
| // for example: | ||
| // INSERT INTO TABLE t PARTITION BY(p1, p2) | ||
| // SELECT c, p1, 'a' as p2 | ||
| var previousStaticPartition = true | ||
| val newStaticPartitionSpec = new mutable.HashMap[String, String]() | ||
| val it = v1Writes.partitionColumns.drop(v1Writes.numStaticPartitionCols) | ||
| .map(attr => output.find(o => resolver(attr.name, o.name))).iterator | ||
| while (previousStaticPartition && it.hasNext) { | ||
| it.next().flatMap(part => tryEvalStaticPartition(part)) match { | ||
| case Some((name, partitionValue)) => newStaticPartitionSpec.put(name, partitionValue) | ||
| case None => previousStaticPartition = false | ||
| } | ||
| } | ||
|
|
||
| if (newStaticPartitionSpec.nonEmpty) { | ||
| v1Writes.withNewStaticPartitionSpec(v1Writes.staticPartitions ++ newStaticPartitionSpec) | ||
| } else { | ||
| v1Writes | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| object V1WritesUtils { | ||
|
|
||
| /** A function that converts the empty string to null for partition values. */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1539,7 +1539,7 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { | |
| assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) | ||
| assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0) | ||
| spark.sql( | ||
| s"INSERT INTO TABLE $tableName PARTITION(c='c', b) SELECT *, 'b' FROM t WHERE 1 = 0") | ||
| s"INSERT INTO TABLE $tableName PARTITION(c='c', b) SELECT *, a FROM t WHERE 1 = 0") | ||
|
||
| assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0) | ||
| assert(!new File(dir, "c=c/b=b").exists()) | ||
| checkAnswer(spark.table(tableName), Nil) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain more about the difference between write with dynamic partition columns and without? The code can be simplified quite a lot if we just need to remove sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the main benefit is we can save a local sort. yes, I agree, the current implementation is over complex. The reason is I guess some downstream projects or extensions may depend on the static partitions(e.g. add repartition for dynamic partition writes), so I merge the infered static partitions into the original. I'm fine to simplify the code to just remove a sort.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the downstream projects can detect real dynamic partition columns by themselves. I'd prefer a simple solution here to just remove local sort.