-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter #34568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
30eee7e
858c0ec
d36f2f0
c3f30b4
6593ca3
faa5be9
5ed40c1
121aa24
08e08b4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,9 +34,7 @@ import org.apache.spark.sql.catalyst.InternalRow | |
| import org.apache.spark.sql.catalyst.catalog.BucketSpec | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} | ||
| import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning | ||
| import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} | ||
| import org.apache.spark.sql.errors.QueryExecutionErrors | ||
| import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} | ||
|
|
@@ -47,7 +45,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} | |
|
|
||
|
|
||
| /** A helper object for writing FileFormat data out to a location. */ | ||
| object FileFormatWriter extends Logging { | ||
| object FileFormatWriter extends Logging with V1WritesHelper { | ||
| /** Describes how output files should be placed in the filesystem. */ | ||
| case class OutputSpec( | ||
| outputPath: String, | ||
|
|
@@ -78,6 +76,7 @@ object FileFormatWriter extends Logging { | |
| maxWriters: Int, | ||
| createSorter: () => UnsafeExternalRowSorter) | ||
|
|
||
| // scalastyle:off argcount | ||
| /** | ||
| * Basic work flow of this command is: | ||
| * 1. Driver side setup, including output committer initialization and data source specific | ||
|
|
@@ -100,6 +99,7 @@ object FileFormatWriter extends Logging { | |
| outputSpec: OutputSpec, | ||
| hadoopConf: Configuration, | ||
| partitionColumns: Seq[Attribute], | ||
| staticPartitionColumns: Seq[Attribute], | ||
| bucketSpec: Option[BucketSpec], | ||
| statsTrackers: Seq[WriteJobStatsTracker], | ||
| options: Map[String, String]) | ||
|
|
@@ -126,39 +126,7 @@ object FileFormatWriter extends Logging { | |
| } | ||
| val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else plan | ||
|
|
||
| val writerBucketSpec = bucketSpec.map { spec => | ||
| val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) | ||
|
|
||
| if (options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite, "false") == | ||
| "true") { | ||
| // Hive bucketed table: use `HiveHash` and bitwise-and as bucket id expression. | ||
| // Without the extra bitwise-and operation, we can get wrong bucket id when hash value of | ||
| // columns is negative. See Hive implementation in | ||
| // `org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`. | ||
| val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue)) | ||
| val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets)) | ||
|
|
||
| // The bucket file name prefix is following Hive, Presto and Trino conversion, so this | ||
| // makes sure Hive bucketed table written by Spark, can be read by other SQL engines. | ||
| // | ||
| // Hive: `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`. | ||
| // Trino: `io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`. | ||
| val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_" | ||
| WriterBucketSpec(bucketIdExpression, fileNamePrefix) | ||
| } else { | ||
| // Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id | ||
| // expression, so that we can guarantee the data distribution is same between shuffle and | ||
| // bucketed data source, which enables us to only shuffle one side when join a bucketed | ||
| // table and a normal one. | ||
| val bucketIdExpression = HashPartitioning(bucketColumns, spec.numBuckets) | ||
| .partitionIdExpression | ||
| WriterBucketSpec(bucketIdExpression, (_: Int) => "") | ||
| } | ||
| } | ||
| val sortColumns = bucketSpec.toSeq.flatMap { | ||
| spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) | ||
| } | ||
|
|
||
| val writerBucketSpec = getWriterBucketSpec(bucketSpec, dataColumns, options) | ||
| val caseInsensitiveOptions = CaseInsensitiveMap(options) | ||
|
|
||
| val dataSchema = dataColumns.toStructType | ||
|
|
@@ -184,20 +152,6 @@ object FileFormatWriter extends Logging { | |
| statsTrackers = statsTrackers | ||
| ) | ||
|
|
||
| // We should first sort by partition columns, then bucket id, and finally sorting columns. | ||
| val requiredOrdering = | ||
| partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns | ||
| // the sort order doesn't matter | ||
| val actualOrdering = empty2NullPlan.outputOrdering.map(_.child) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a issue here, since we have AQE. The plan is the This pr can resolve this issue together. @cloud-fan @c21 |
||
| val orderingMatched = if (requiredOrdering.length > actualOrdering.length) { | ||
| false | ||
| } else { | ||
| requiredOrdering.zip(actualOrdering).forall { | ||
| case (requiredOrder, childOutputOrder) => | ||
| requiredOrder.semanticEquals(childOutputOrder) | ||
| } | ||
| } | ||
|
|
||
| SQLExecution.checkSQLExecutionId(sparkSession) | ||
|
|
||
| // propagate the description UUID into the jobs, so that committers | ||
|
|
@@ -208,29 +162,26 @@ object FileFormatWriter extends Logging { | |
| // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. | ||
| committer.setupJob(job) | ||
|
|
||
| val sortColumns = getBucketSortColumns(bucketSpec, dataColumns) | ||
| try { | ||
| val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { | ||
| (empty2NullPlan.execute(), None) | ||
| val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters | ||
| val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty | ||
| val concurrentOutputWriterSpec = if (concurrentWritersEnabled) { | ||
| val output = empty2NullPlan.output | ||
| val enableRadixSort = sparkSession.sessionState.conf.enableRadixSort | ||
| val outputSchema = empty2NullPlan.schema | ||
| Some(ConcurrentOutputWriterSpec(maxWriters, | ||
| () => SortExec.createSorter( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel this refactoring (
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Look at the previous code, we create and eval a SortExec is mainly for the ordering of dynamic partition. For the concurrent writers, we only need the sorter. After we pull out the sort, create a new SortExec seems overkill. |
||
| getSortOrder(output, partitionColumns, staticPartitionColumns.size, | ||
| bucketSpec, options), | ||
| output, | ||
| outputSchema, | ||
| enableRadixSort | ||
| ))) | ||
| } else { | ||
| // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and | ||
| // the physical plan may have different attribute ids due to optimizer removing some | ||
| // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. | ||
| val orderingExpr = bindReferences( | ||
| requiredOrdering.map(SortOrder(_, Ascending)), finalOutputSpec.outputColumns) | ||
| val sortPlan = SortExec( | ||
| orderingExpr, | ||
| global = false, | ||
| child = empty2NullPlan) | ||
|
|
||
| val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters | ||
| val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty | ||
| if (concurrentWritersEnabled) { | ||
| (empty2NullPlan.execute(), | ||
| Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter()))) | ||
| } else { | ||
| (sortPlan.execute(), None) | ||
| } | ||
| None | ||
| } | ||
| val rdd = empty2NullPlan.execute() | ||
|
|
||
| // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single | ||
| // partition rdd to make sure we at least set up one write task to write the metadata. | ||
|
|
@@ -278,6 +229,7 @@ object FileFormatWriter extends Logging { | |
| throw QueryExecutionErrors.jobAbortedError(cause) | ||
| } | ||
| } | ||
| // scalastyle:on argcount | ||
|
|
||
| /** Writes data out in a single Spark task. */ | ||
| private def executeTask( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change is because
maxConcurrentOutputFileWritersneed to create sorter atFileFormatWriter