Skip to content

Commit 2562274

Browse files
allisonwang-dbcloud-fan
authored andcommitted
[SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter
### What changes were proposed in this pull request? `FileFormatWriter.write` is used by all V1 write commands including data source and hive tables. Depending on dynamic partitions, bucketed, and sort columns in the V1 write command, `FileFormatWriter` can add a physical sort on top of the query plan which is not visible from plan directly. This PR (based on #34568) intends to pull out the physical sort added by `FileFormatWriter` into logical planning. It adds a new logical rule `V1Writes` to add logical Sort operators based on the required ordering of a V1 write command. This behavior can be controlled by the new config **spark.sql.optimizer.plannedWrite.enabled** (default: true). ### Why are the changes needed? Improve observability of V1 write, and unify the logic of V1 and V2 write commands. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit tests. Closes #37099 from allisonwang-db/spark-37287-v1-writes. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 2ef82ad commit 2562274

File tree

15 files changed

+703
-126
lines changed

15 files changed

+703
-126
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,14 @@ object SQLConf {
412412
.longConf
413413
.createWithDefault(67108864L)
414414

415+
val PLANNED_WRITE_ENABLED = buildConf("spark.sql.optimizer.plannedWrite.enabled")
416+
.internal()
417+
.doc("When set to true, Spark optimizer will add logical sort operators to V1 write commands " +
418+
"if needed so that `FileFormatWriter` does not need to insert physical sorts.")
419+
.version("3.4.0")
420+
.booleanConf
421+
.createWithDefault(true)
422+
415423
val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
416424
.doc("When set to true Spark SQL will automatically select a compression codec for each " +
417425
"column based on statistics of the data.")
@@ -4617,6 +4625,8 @@ class SQLConf extends Serializable with Logging {
46174625

46184626
def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)
46194627

4628+
def plannedWriteEnabled: Boolean = getConf(SQLConf.PLANNED_WRITE_ENABLED)
4629+
46204630
def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)
46214631

46224632
def legacyInferArrayTypeFromFirstElement: Boolean = getConf(

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer._
2323
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2424
import org.apache.spark.sql.catalyst.rules.Rule
2525
import org.apache.spark.sql.connector.catalog.CatalogManager
26-
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
27-
import org.apache.spark.sql.execution.datasources.SchemaPruning
26+
import org.apache.spark.sql.execution.datasources.{PruneFileSourcePartitions, SchemaPruning, V1Writes}
2827
import org.apache.spark.sql.execution.datasources.v2.{GroupBasedRowLevelOperationScanPlanning, OptimizeMetadataOnlyDeleteFromTable, V2ScanPartitioningAndOrdering, V2ScanRelationPushDown, V2Writes}
2928
import org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning}
3029
import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs}
@@ -39,6 +38,7 @@ class SparkOptimizer(
3938
// TODO: move SchemaPruning into catalyst
4039
Seq(SchemaPruning) :+
4140
GroupBasedRowLevelOperationScanPlanning :+
41+
V1Writes :+
4242
V2ScanRelationPushDown :+
4343
V2ScanPartitioningAndOrdering :+
4444
V2Writes :+

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
2727
import org.apache.spark.sql.catalyst.trees.TreePattern._
2828
import org.apache.spark.sql.execution._
2929
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
30+
import org.apache.spark.sql.execution.datasources.V1WriteCommand
3031
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
3132
import org.apache.spark.sql.execution.exchange.Exchange
3233
import org.apache.spark.sql.internal.SQLConf
@@ -46,8 +47,9 @@ case class InsertAdaptiveSparkPlan(
4647
case _ if !conf.adaptiveExecutionEnabled => plan
4748
case _: ExecutedCommandExec => plan
4849
case _: CommandResultExec => plan
49-
case c: DataWritingCommandExec => c.copy(child = apply(c.child))
5050
case c: V2CommandExec => c.withNewChildren(c.children.map(apply))
51+
case c: DataWritingCommandExec if !c.cmd.isInstanceOf[V1WriteCommand] =>
52+
c.copy(child = apply(c.child))
5153
case _ if shouldApplyAQE(plan, isSubquery) =>
5254
if (supportAdaptive(plan)) {
5355
try {

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.command
2020
import java.net.URI
2121

2222
import org.apache.spark.sql._
23+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
2324
import org.apache.spark.sql.catalyst.catalog._
25+
import org.apache.spark.sql.catalyst.expressions.SortOrder
2426
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2527
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
2628
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -141,7 +143,18 @@ case class CreateDataSourceTableAsSelectCommand(
141143
mode: SaveMode,
142144
query: LogicalPlan,
143145
outputColumnNames: Seq[String])
144-
extends DataWritingCommand {
146+
extends V1WriteCommand {
147+
148+
override def requiredOrdering: Seq[SortOrder] = {
149+
val unresolvedPartitionColumns = table.partitionColumnNames.map(UnresolvedAttribute.quoted)
150+
val partitionColumns = DataSource.resolvePartitionColumns(
151+
unresolvedPartitionColumns,
152+
outputColumns,
153+
query,
154+
SparkSession.active.sessionState.conf.resolver)
155+
val options = table.storage.properties
156+
V1WritesUtils.getSortOrder(outputColumns, partitionColumns, table.bucketSpec, options)
157+
}
145158

146159
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
147160
assert(table.tableType != CatalogTableType.VIEW)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ import org.apache.spark.SparkException
2929
import org.apache.spark.deploy.SparkHadoopUtil
3030
import org.apache.spark.internal.Logging
3131
import org.apache.spark.sql._
32-
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
32+
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
3333
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
34+
import org.apache.spark.sql.catalyst.expressions.Attribute
3435
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3536
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils}
3637
import org.apache.spark.sql.connector.catalog.TableProvider
@@ -519,18 +520,8 @@ case class DataSource(
519520
case format: FileFormat =>
520521
disallowWritingIntervals(outputColumns.map(_.dataType), forbidAnsiIntervals = false)
521522
val cmd = planForWritingFileFormat(format, mode, data)
522-
val resolvedPartCols = cmd.partitionColumns.map { col =>
523-
// The partition columns created in `planForWritingFileFormat` should always be
524-
// `UnresolvedAttribute` with a single name part.
525-
assert(col.isInstanceOf[UnresolvedAttribute])
526-
val unresolved = col.asInstanceOf[UnresolvedAttribute]
527-
assert(unresolved.nameParts.length == 1)
528-
val name = unresolved.nameParts.head
529-
outputColumns.find(a => equality(a.name, name)).getOrElse {
530-
throw QueryCompilationErrors.cannotResolveAttributeError(
531-
name, data.output.map(_.name).mkString(", "))
532-
}
533-
}
523+
val resolvedPartCols =
524+
DataSource.resolvePartitionColumns(cmd.partitionColumns, outputColumns, data, equality)
534525
val resolved = cmd.copy(
535526
partitionColumns = resolvedPartCols,
536527
outputColumnNames = outputColumnNames)
@@ -836,4 +827,26 @@ object DataSource extends Logging {
836827
throw QueryCompilationErrors.writeEmptySchemasUnsupportedByDataSourceError()
837828
}
838829
}
830+
831+
/**
832+
* Resolve partition columns using output columns of the query plan.
833+
*/
834+
def resolvePartitionColumns(
835+
partitionColumns: Seq[Attribute],
836+
outputColumns: Seq[Attribute],
837+
plan: LogicalPlan,
838+
resolver: Resolver): Seq[Attribute] = {
839+
partitionColumns.map { col =>
840+
// The partition columns created in `planForWritingFileFormat` should always be
841+
// `UnresolvedAttribute` with a single name part.
842+
assert(col.isInstanceOf[UnresolvedAttribute])
843+
val unresolved = col.asInstanceOf[UnresolvedAttribute]
844+
assert(unresolved.nameParts.length == 1)
845+
val name = unresolved.nameParts.head
846+
outputColumns.find(a => resolver(a.name, name)).getOrElse {
847+
throw QueryCompilationErrors.cannotResolveAttributeError(
848+
name, plan.output.map(_.name).mkString(", "))
849+
}
850+
}
851+
}
839852
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3636
import org.apache.spark.sql.catalyst.expressions._
3737
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
3838
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
39-
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
4039
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
4140
import org.apache.spark.sql.errors.QueryExecutionErrors
4241
import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter}
@@ -78,6 +77,12 @@ object FileFormatWriter extends Logging {
7877
maxWriters: Int,
7978
createSorter: () => UnsafeExternalRowSorter)
8079

80+
/**
81+
* A variable used in tests to check whether the output ordering of the query matches the
82+
* required ordering of the write command.
83+
*/
84+
private[sql] var outputOrderingMatched: Boolean = false
85+
8186
/**
8287
* Basic work flow of this command is:
8388
* 1. Driver side setup, including output committer initialization and data source specific
@@ -126,38 +131,8 @@ object FileFormatWriter extends Logging {
126131
}
127132
val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else plan
128133

129-
val writerBucketSpec = bucketSpec.map { spec =>
130-
val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
131-
132-
if (options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite, "false") ==
133-
"true") {
134-
// Hive bucketed table: use `HiveHash` and bitwise-and as bucket id expression.
135-
// Without the extra bitwise-and operation, we can get wrong bucket id when hash value of
136-
// columns is negative. See Hive implementation in
137-
// `org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`.
138-
val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue))
139-
val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets))
140-
141-
// The bucket file name prefix is following Hive, Presto and Trino conversion, so this
142-
// makes sure Hive bucketed table written by Spark, can be read by other SQL engines.
143-
//
144-
// Hive: `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`.
145-
// Trino: `io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`.
146-
val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_"
147-
WriterBucketSpec(bucketIdExpression, fileNamePrefix)
148-
} else {
149-
// Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id
150-
// expression, so that we can guarantee the data distribution is same between shuffle and
151-
// bucketed data source, which enables us to only shuffle one side when join a bucketed
152-
// table and a normal one.
153-
val bucketIdExpression = HashPartitioning(bucketColumns, spec.numBuckets)
154-
.partitionIdExpression
155-
WriterBucketSpec(bucketIdExpression, (_: Int) => "")
156-
}
157-
}
158-
val sortColumns = bucketSpec.toSeq.flatMap {
159-
spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get)
160-
}
134+
val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options)
135+
val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)
161136

162137
val caseInsensitiveOptions = CaseInsensitiveMap(options)
163138

@@ -209,6 +184,16 @@ object FileFormatWriter extends Logging {
209184
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
210185
committer.setupJob(job)
211186

187+
// When `PLANNED_WRITE_ENABLED` is true, the optimizer rule V1Writes will add logical sort
188+
// operator based on the required ordering of the V1 write command. So the output
189+
// ordering of the physical plan should always match the required ordering. Here
190+
// we set the variable to verify this behavior in tests.
191+
// There are two cases where FileFormatWriter still needs to add physical sort:
192+
// 1) When the planned write config is disabled.
193+
// 2) When the concurrent writers are enabled (in this case the required ordering of a
194+
// V1 write command will be empty).
195+
if (Utils.isTesting) outputOrderingMatched = orderingMatched
196+
212197
try {
213198
val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
214199
(empty2NullPlan.execute(), None)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql._
2424
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition}
2525
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2626
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
27-
import org.apache.spark.sql.catalyst.expressions.Attribute
27+
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
2828
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2929
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3030
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
@@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand(
5757
catalogTable: Option[CatalogTable],
5858
fileIndex: Option[FileIndex],
5959
outputColumnNames: Seq[String])
60-
extends DataWritingCommand {
60+
extends V1WriteCommand {
6161

6262
private lazy val parameters = CaseInsensitiveMap(options)
6363

@@ -74,6 +74,9 @@ case class InsertIntoHadoopFsRelationCommand(
7474
staticPartitions.size < partitionColumns.length
7575
}
7676

77+
override def requiredOrdering: Seq[SortOrder] =
78+
V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options)
79+
7780
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
7881
// Most formats don't do well with duplicate columns, so lets not allow that
7982
SchemaUtils.checkColumnNameDuplication(
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import org.apache.spark.sql.catalyst.SQLConfHelper
21+
import org.apache.spark.sql.catalyst.catalog.BucketSpec
22+
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, BitwiseAnd, HiveHash, Literal, Pmod, SortOrder}
23+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
24+
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
25+
import org.apache.spark.sql.catalyst.rules.Rule
26+
import org.apache.spark.sql.execution.command.DataWritingCommand
27+
import org.apache.spark.sql.internal.SQLConf
28+
29+
trait V1WriteCommand extends DataWritingCommand {
30+
// Specify the required ordering for the V1 write command. `FileFormatWriter` will
31+
// add SortExec if necessary when the requiredOrdering is empty.
32+
def requiredOrdering: Seq[SortOrder]
33+
}
34+
35+
/**
36+
* A rule that adds logical sorts to V1 data writing commands.
37+
*/
38+
object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
39+
override def apply(plan: LogicalPlan): LogicalPlan = {
40+
if (conf.plannedWriteEnabled) {
41+
plan.transformDown {
42+
case write: V1WriteCommand =>
43+
val newQuery = prepareQuery(write, write.query)
44+
write.withNewChildren(newQuery :: Nil)
45+
}
46+
} else {
47+
plan
48+
}
49+
}
50+
51+
private def prepareQuery(write: V1WriteCommand, query: LogicalPlan): LogicalPlan = {
52+
val requiredOrdering = write.requiredOrdering
53+
val outputOrdering = query.outputOrdering
54+
// Check if the ordering is already matched. It is needed to ensure the
55+
// idempotency of the rule.
56+
val orderingMatched = if (requiredOrdering.length > outputOrdering.length) {
57+
false
58+
} else {
59+
requiredOrdering.zip(outputOrdering).forall {
60+
case (requiredOrder, outputOrder) => requiredOrder.semanticEquals(outputOrder)
61+
}
62+
}
63+
if (orderingMatched) {
64+
query
65+
} else {
66+
Sort(requiredOrdering, global = false, query)
67+
}
68+
}
69+
}
70+
71+
object V1WritesUtils {
72+
73+
def getWriterBucketSpec(
74+
bucketSpec: Option[BucketSpec],
75+
dataColumns: Seq[Attribute],
76+
options: Map[String, String]): Option[WriterBucketSpec] = {
77+
bucketSpec.map { spec =>
78+
val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
79+
80+
if (options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite, "false") ==
81+
"true") {
82+
// Hive bucketed table: use `HiveHash` and bitwise-and as bucket id expression.
83+
// Without the extra bitwise-and operation, we can get wrong bucket id when hash value of
84+
// columns is negative. See Hive implementation in
85+
// `org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`.
86+
val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue))
87+
val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets))
88+
89+
// The bucket file name prefix is following Hive, Presto and Trino conversion, so this
90+
// makes sure Hive bucketed table written by Spark, can be read by other SQL engines.
91+
//
92+
// Hive: `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`.
93+
// Trino: `io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`.
94+
val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_"
95+
WriterBucketSpec(bucketIdExpression, fileNamePrefix)
96+
} else {
97+
// Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id
98+
// expression, so that we can guarantee the data distribution is same between shuffle and
99+
// bucketed data source, which enables us to only shuffle one side when join a bucketed
100+
// table and a normal one.
101+
val bucketIdExpression = HashPartitioning(bucketColumns, spec.numBuckets)
102+
.partitionIdExpression
103+
WriterBucketSpec(bucketIdExpression, (_: Int) => "")
104+
}
105+
}
106+
}
107+
108+
def getBucketSortColumns(
109+
bucketSpec: Option[BucketSpec],
110+
dataColumns: Seq[Attribute]): Seq[Attribute] = {
111+
bucketSpec.toSeq.flatMap {
112+
spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get)
113+
}
114+
}
115+
116+
def getSortOrder(
117+
outputColumns: Seq[Attribute],
118+
partitionColumns: Seq[Attribute],
119+
bucketSpec: Option[BucketSpec],
120+
options: Map[String, String]): Seq[SortOrder] = {
121+
val partitionSet = AttributeSet(partitionColumns)
122+
val dataColumns = outputColumns.filterNot(partitionSet.contains)
123+
val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options)
124+
val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)
125+
126+
if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) {
127+
// Do not insert logical sort when concurrent output writers are enabled.
128+
Seq.empty
129+
} else {
130+
// We should first sort by partition columns, then bucket id, and finally sorting columns.
131+
// Note we do not need to convert empty string partition columns to null when sorting the
132+
// columns since null and empty string values will be next to each other.
133+
(partitionColumns ++writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns)
134+
.map(SortOrder(_, Ascending))
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)