Skip to content

Commit 109e2b5

Browse files
committed
fix some code review
1 parent f2b9bd8 commit 109e2b5

File tree

12 files changed

+54
-33
lines changed

12 files changed

+54
-33
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.catalog
1919

20+
import java.net.URI
21+
2022
import org.apache.hadoop.fs.Path
2123
import org.apache.hadoop.util.Shell
2224

@@ -162,6 +164,28 @@ object CatalogUtils {
162164
BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols)
163165
}
164166

167+
/**
168+
* Convert URI to String.
169+
* Since URI.toString does not decode for the uri string, we need to use
170+
* Path(uri).toString to decode it.
171+
* @param uri the URI of the path
172+
* @return the String of the path
173+
*/
174+
def URIToString(uri: Option[URI]): Option[String] = {
175+
uri.map(new Path(_).toString)
176+
}
177+
178+
/**
179+
* Convert String to URI.
180+
* Since new URI(string) does not encode for the path string, we need to use
181+
* Path(string).toURI to encode it.
182+
* @param str the String of the path
183+
* @return the URI of the path
184+
*/
185+
def stringToURI(str: Option[String]): Option[URI] = {
186+
str.map(new Path(_).toUri)
187+
}
188+
165189
private def normalizeColumnName(
166190
tableName: String,
167191
tableCols: Seq[String],

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.catalog
1919

20+
import java.net.URI
2021
import javax.annotation.concurrent.GuardedBy
2122

2223
import scala.collection.mutable
@@ -131,7 +132,7 @@ class SessionCatalog(
131132
* does not contain a scheme, this path will not be changed after the default
132133
* FileSystem is changed.
133134
*/
134-
private def makeQualifiedPath(path: String): Path = {
135+
private def makeQualifiedPath(path: URI): Path = {
135136
val hadoopPath = new Path(path)
136137
val fs = hadoopPath.getFileSystem(hadoopConf)
137138
fs.makeQualified(hadoopPath)
@@ -170,7 +171,7 @@ class SessionCatalog(
170171
"you cannot create a database with this name.")
171172
}
172173
validateName(dbName)
173-
val qualifiedPath = makeQualifiedPath(new Path(dbDefinition.locationUri).toString).toUri
174+
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toUri
174175
externalCatalog.createDatabase(
175176
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
176177
ignoreIfExists)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
342342
"db1",
343343
"tbl",
344344
Map("partCol1" -> "1", "partCol2" -> "2")).location
345-
val tableLocationPath = new Path(catalog.getTable("db1", "tbl").location)
346-
val defaultPartitionLocation = new Path(new Path(tableLocationPath, "partCol1=1"), "partCol2=2")
345+
val tableLocation = new Path(catalog.getTable("db1", "tbl").location)
346+
val defaultPartitionLocation = new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2")
347347
assert(new Path(partitionLocation) == defaultPartitionLocation)
348348
}
349349

@@ -367,10 +367,10 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
367367

368368
val partition1 =
369369
CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"),
370-
storageFormat.copy(locationUri = Some(new Path(newLocationPart1).toUri)))
370+
storageFormat.copy(locationUri = Some(newLocationPart1)))
371371
val partition2 =
372372
CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"),
373-
storageFormat.copy(locationUri = Some(new Path(newLocationPart2).toUri)))
373+
storageFormat.copy(locationUri = Some(newLocationPart2)))
374374
catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = false)
375375
catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = false)
376376

@@ -510,7 +510,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
510510
partitionColumnNames = Seq("partCol1", "partCol2"))
511511
catalog.createTable(table, ignoreIfExists = false)
512512

513-
val tableLocationPath = new Path(catalog.getTable("db1", "tbl").location)
513+
val tableLocation = new Path(catalog.getTable("db1", "tbl").location)
514514

515515
val mixedCasePart1 = CatalogTablePartition(
516516
Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat)
@@ -520,12 +520,12 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
520520
catalog.createPartitions("db1", "tbl", Seq(mixedCasePart1), ignoreIfExists = false)
521521
assert(
522522
new Path(catalog.getPartition("db1", "tbl", mixedCasePart1.spec).location) ==
523-
new Path(new Path(tableLocationPath, "partCol1=1"), "partCol2=2"))
523+
new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2"))
524524

525525
catalog.renamePartitions("db1", "tbl", Seq(mixedCasePart1.spec), Seq(mixedCasePart2.spec))
526526
assert(
527527
new Path(catalog.getPartition("db1", "tbl", mixedCasePart2.spec).location) ==
528-
new Path(new Path(tableLocationPath, "partCol1=3"), "partCol2=4"))
528+
new Path(new Path(tableLocation, "partCol1=3"), "partCol2=4"))
529529

530530
// For external tables, RENAME PARTITION should not update the partition location.
531531
val existingPartLoc = catalog.getPartition("db2", "tbl2", part1.spec).location
@@ -555,21 +555,21 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
555555
test("alter partitions") {
556556
val catalog = newBasicCatalog()
557557
try {
558-
val newLocationUri = new Path(newUriForDatabase()).toUri
558+
val newLocation = new Path(newUriForDatabase()).toUri
559559
val newSerde = "com.sparkbricks.text.EasySerde"
560560
val newSerdeProps = Map("spark" -> "bricks", "compressed" -> "false")
561561
// alter but keep spec the same
562562
val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
563563
val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
564564
catalog.alterPartitions("db2", "tbl2", Seq(
565-
oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocationUri))),
566-
oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocationUri)))))
565+
oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
566+
oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
567567
val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
568568
val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
569-
assert(newPart1.storage.locationUri == Some(newLocationUri))
570-
assert(newPart2.storage.locationUri == Some(newLocationUri))
571-
assert(oldPart1.storage.locationUri != Some(newLocationUri))
572-
assert(oldPart2.storage.locationUri != Some(newLocationUri))
569+
assert(newPart1.storage.locationUri == Some(newLocation))
570+
assert(newPart2.storage.locationUri == Some(newLocation))
571+
assert(oldPart1.storage.locationUri != Some(newLocation))
572+
assert(oldPart2.storage.locationUri != Some(newLocation))
573573
// alter other storage information
574574
catalog.alterPartitions("db2", "tbl2", Seq(
575575
oldPart1.copy(storage = storageFormat.copy(serde = Some(newSerde))),

sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.catalog
1919

20-
import java.net.URI
2120
import javax.annotation.Nullable
2221

2322
import org.apache.spark.annotation.InterfaceStability

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
387387
"LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " +
388388
"you can only specify one of them.", ctx)
389389
}
390-
val customLocation = storage.locationUri.orElse(location)
390+
val customLocation = storage.locationUri.orElse(CatalogUtils.stringToURI(location))
391391

392392
val tableType = if (customLocation.isDefined) {
393393
CatalogTableType.EXTERNAL
@@ -398,8 +398,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
398398
val tableDesc = CatalogTable(
399399
identifier = table,
400400
tableType = tableType,
401-
storage = storage.copy(locationUri = customLocation.map{ loc =>
402-
new Path(loc.toString).toUri}),
401+
storage = storage.copy(locationUri = customLocation),
403402
schema = schema.getOrElse(new StructType),
404403
provider = Some(provider),
405404
partitionColumnNames = partitionColumnNames,
@@ -1083,7 +1082,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
10831082
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx)
10841083
}
10851084

1086-
val locUri = location.map{ loc => new Path(loc).toUri }
1085+
val locUri = CatalogUtils.stringToURI(location)
10871086
val storage = CatalogStorageFormat(
10881087
locationUri = locUri,
10891088
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ case class CreateDataSourceTableAsSelectCommand(
184184
mode: SaveMode,
185185
tableExists: Boolean): BaseRelation = {
186186
// Create the relation based on the input logical plan: `data`.
187-
val pathOption = tableLocation.map("path" -> new Path(_).toString)
187+
val pathOption = CatalogUtils.URIToString(tableLocation).map("path" -> _)
188188
val dataSource = DataSource(
189189
session,
190190
className = table.provider.get,

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ case class AlterTableAddPartitionCommand(
427427
sparkSession.sessionState.conf.resolver)
428428
// inherit table storage format (possibly except for location)
429429
CatalogTablePartition(normalizedSpec, table.storage.copy(
430-
locationUri = location.map(new Path(_).toUri)))
430+
locationUri = CatalogUtils.stringToURI(location)))
431431
}
432432
catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
433433
Seq.empty[Row]

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ case class CreateTableLikeCommand(
7979
CatalogTable(
8080
identifier = targetTable,
8181
tableType = tblType,
82-
storage = sourceTableDesc.storage.copy(locationUri = location.map(new Path(_).toUri)),
82+
storage = sourceTableDesc.storage.copy(locationUri = CatalogUtils.stringToURI(location)),
8383
schema = sourceTableDesc.schema,
8484
provider = newProvider,
8585
partitionColumnNames = sourceTableDesc.partitionColumnNames,
@@ -495,7 +495,7 @@ case class DescribeTableCommand(
495495
append(buffer, "Owner:", table.owner, "")
496496
append(buffer, "Create Time:", new Date(table.createTime).toString, "")
497497
append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "")
498-
append(buffer, "Location:", table.storage.locationUri.map(new Path(_).toString)
498+
append(buffer, "Location:", CatalogUtils.URIToString(table.storage.locationUri)
499499
.getOrElse(""), "")
500500
append(buffer, "Table Type:", table.tableType.name, "")
501501
table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, ""))
@@ -588,7 +588,7 @@ case class DescribeTableCommand(
588588
append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "")
589589
append(buffer, "Database:", table.database, "")
590590
append(buffer, "Table:", tableIdentifier.table, "")
591-
append(buffer, "Location:", partition.storage.locationUri.map(new Path(_).toString)
591+
append(buffer, "Location:", CatalogUtils.URIToString(partition.storage.locationUri)
592592
.getOrElse(""), "")
593593
append(buffer, "Partition Parameters:", "", "")
594594
partition.parameters.foreach { case (key, value) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
2929
import org.apache.spark.deploy.SparkHadoopUtil
3030
import org.apache.spark.internal.Logging
3131
import org.apache.spark.sql._
32-
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable}
32+
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
3333
import org.apache.spark.sql.catalyst.expressions.Attribute
3434
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3535
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
@@ -598,6 +598,6 @@ object DataSource {
598598
val path = CaseInsensitiveMap(options).get("path")
599599
val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path")
600600
CatalogStorageFormat.empty.copy(
601-
locationUri = path.map(new Path(_).toUri), properties = optionsWithoutPath)
601+
locationUri = CatalogUtils.stringToURI(path), properties = optionsWithoutPath)
602602
}
603603
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql._
2929
import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier}
3030
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
3131
import org.apache.spark.sql.catalyst.analysis._
32-
import org.apache.spark.sql.catalyst.catalog.CatalogRelation
32+
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils}
3333
import org.apache.spark.sql.catalyst.expressions
3434
import org.apache.spark.sql.catalyst.expressions._
3535
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -222,7 +222,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
222222

223223
val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
224224
override def call(): LogicalPlan = {
225-
val pathOption = table.storage.locationUri.map("path" -> new Path(_).toString)
225+
val pathOption = CatalogUtils.URIToString(table.storage.locationUri).map("path" -> _)
226226
val dataSource =
227227
DataSource(
228228
sparkSession,

0 commit comments

Comments
 (0)