Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.catalog

import java.net.URI

import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.Shell

Expand Down Expand Up @@ -162,6 +164,30 @@ object CatalogUtils {
BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols)
}

/**
* Convert URI to String.
* Since URI.toString does not decode the uri, e.g. change '%25' to '%'.
* Here we create a hadoop Path with the given URI, and rely on Path.toString
* to decode the uri
* @param uri the URI of the path
* @return the String of the path
*/
def URIToString(uri: URI): String = {
new Path(uri).toString
}

/**
* Convert String to URI.
* Since new URI(string) does not encode string, e.g. change '%' to '%25'.
* Here we create a hadoop Path with the given String, and rely on Path.toUri
* to encode the string
* @param str the String of the path
* @return the URI of the path
*/
def stringToURI(str: String): URI = {
new Path(str).toUri
}

private def normalizeColumnName(
tableName: String,
tableCols: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class InMemoryCatalog(
tableDefinition.storage.locationUri.isEmpty

val tableWithLocation = if (needDefaultTableLocation) {
val defaultTableLocation = new Path(catalog(db).db.locationUri, table)
val defaultTableLocation = new Path(new Path(catalog(db).db.locationUri), table)
try {
val fs = defaultTableLocation.getFileSystem(hadoopConfig)
fs.mkdirs(defaultTableLocation)
Expand All @@ -211,7 +211,7 @@ class InMemoryCatalog(
throw new SparkException(s"Unable to create table $table as failed " +
s"to create its directory $defaultTableLocation", e)
}
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri.toString))
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri))
} else {
tableDefinition
}
Expand Down Expand Up @@ -274,7 +274,7 @@ class InMemoryCatalog(
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
val oldDir = new Path(oldDesc.table.location)
val newDir = new Path(catalog(db).db.locationUri, newName)
val newDir = new Path(new Path(catalog(db).db.locationUri), newName)
try {
val fs = oldDir.getFileSystem(hadoopConfig)
fs.rename(oldDir, newDir)
Expand All @@ -283,7 +283,7 @@ class InMemoryCatalog(
throw new SparkException(s"Unable to rename table $oldName to $newName as failed " +
s"to rename its directory $oldDir", e)
}
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri.toString))
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri))
}

catalog(db).tables.put(newName, oldDesc)
Expand Down Expand Up @@ -389,7 +389,7 @@ class InMemoryCatalog(

existingParts.put(
p.spec,
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))))
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri))))
}
}

Expand Down Expand Up @@ -462,7 +462,7 @@ class InMemoryCatalog(
}
oldPartition.copy(
spec = newSpec,
storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString)))
storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toUri)))
} else {
oldPartition.copy(spec = newSpec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.catalog

import java.net.URI
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
Expand Down Expand Up @@ -131,10 +132,10 @@ class SessionCatalog(
* does not contain a scheme, this path will not be changed after the default
* FileSystem is changed.
*/
private def makeQualifiedPath(path: String): Path = {
private def makeQualifiedPath(path: URI): URI = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(hadoopConf)
fs.makeQualified(hadoopPath)
fs.makeQualified(hadoopPath).toUri
}

private def requireDbExists(db: String): Unit = {
Expand Down Expand Up @@ -170,7 +171,7 @@ class SessionCatalog(
"you cannot create a database with this name.")
}
validateName(dbName)
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri)
externalCatalog.createDatabase(
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
ignoreIfExists)
Expand Down Expand Up @@ -228,9 +229,9 @@ class SessionCatalog(
* Get the path for creating a non-default database when database location is not provided
* by users.
*/
def getDefaultDBPath(db: String): String = {
def getDefaultDBPath(db: String): URI = {
val database = formatDatabaseName(db)
new Path(new Path(conf.warehousePath), database + ".db").toString
new Path(new Path(conf.warehousePath), database + ".db").toUri
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -351,11 +352,11 @@ class SessionCatalog(
db, table, loadPath, spec, isOverwrite, inheritTableSpecs, isSrcLocal)
}

def defaultTablePath(tableIdent: TableIdentifier): String = {
def defaultTablePath(tableIdent: TableIdentifier): URI = {
val dbName = formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase))
val dbLocation = getDatabaseMetadata(dbName).locationUri

new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString
new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toUri
}

// ----------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.catalog

import java.net.URI
import java.util.Date

import com.google.common.base.Objects
Expand Down Expand Up @@ -48,10 +49,7 @@ case class CatalogFunction(
* Storage format, used to describe how a partition or a table is stored.
*/
case class CatalogStorageFormat(
// TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must
// be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and
// path.toUri respectively before use as a filesystem path due to URI char escaping.
locationUri: Option[String],
locationUri: Option[URI],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
Expand Down Expand Up @@ -105,7 +103,7 @@ case class CatalogTablePartition(
}

/** Return the partition location, assuming it is specified. */
def location: String = storage.locationUri.getOrElse {
def location: URI = storage.locationUri.getOrElse {
val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
throw new AnalysisException(s"Partition [$specString] did not specify locationUri")
}
Expand Down Expand Up @@ -210,7 +208,7 @@ case class CatalogTable(
}

/** Return the table location, assuming it is specified. */
def location: String = storage.locationUri.getOrElse {
def location: URI = storage.locationUri.getOrElse {
throw new AnalysisException(s"table $identifier did not specify locationUri")
}

Expand Down Expand Up @@ -241,7 +239,7 @@ case class CatalogTable(

/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
locationUri: Option[String] = storage.locationUri,
locationUri: Option[URI] = storage.locationUri,
inputFormat: Option[String] = storage.inputFormat,
outputFormat: Option[String] = storage.outputFormat,
compressed: Boolean = false,
Expand Down Expand Up @@ -337,7 +335,7 @@ object CatalogTableType {
case class CatalogDatabase(
name: String,
description: String,
locationUri: String,
locationUri: URI,
properties: Map[String, String])


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.catalog

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
Expand Down Expand Up @@ -340,7 +342,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
"db1",
"tbl",
Map("partCol1" -> "1", "partCol2" -> "2")).location
val tableLocation = catalog.getTable("db1", "tbl").location
val tableLocation = new Path(catalog.getTable("db1", "tbl").location)
val defaultPartitionLocation = new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2")
assert(new Path(partitionLocation) == defaultPartitionLocation)
}
Expand Down Expand Up @@ -508,7 +510,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
partitionColumnNames = Seq("partCol1", "partCol2"))
catalog.createTable(table, ignoreIfExists = false)

val tableLocation = catalog.getTable("db1", "tbl").location
val tableLocation = new Path(catalog.getTable("db1", "tbl").location)

val mixedCasePart1 = CatalogTablePartition(
Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat)
Expand Down Expand Up @@ -699,7 +701,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
// File System operations
// --------------------------------------------------------------------------

private def exists(uri: String, children: String*): Boolean = {
private def exists(uri: URI, children: String*): Boolean = {
val base = new Path(uri)
val finalPath = children.foldLeft(base) {
case (parent, child) => new Path(parent, child)
Expand Down Expand Up @@ -742,7 +744,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
identifier = TableIdentifier("external_table", Some("db1")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
Some(Utils.createTempDir().getAbsolutePath),
Some(Utils.createTempDir().toURI),
None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some(defaultProvider)
Expand Down Expand Up @@ -790,7 +792,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val partWithExistingDir = CatalogTablePartition(
Map("partCol1" -> "7", "partCol2" -> "8"),
CatalogStorageFormat(
Some(tempPath.toURI.toString),
Some(tempPath.toURI),
None, None, None, false, Map.empty))
catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false)

Expand All @@ -799,7 +801,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val partWithNonExistingDir = CatalogTablePartition(
Map("partCol1" -> "9", "partCol2" -> "10"),
CatalogStorageFormat(
Some(tempPath.toURI.toString),
Some(tempPath.toURI),
None, None, None, false, Map.empty))
catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false)
assert(tempPath.exists())
Expand Down Expand Up @@ -883,7 +885,7 @@ abstract class CatalogTestUtils {

def newFunc(): CatalogFunction = newFunc("funcName")

def newUriForDatabase(): String = Utils.createTempDir().toURI.toString.stripSuffix("/")
def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can simply it and write Utils.createTempDir().toURI

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Utils.createTempDir().toURI has a suffix '/', here we have to strip it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan after I try to use Path to Compare , I think stripSuffix here is the simple way.

Path with a String type constructor will be equal when one has a /, and another does not.

scala> val x = new Path("/ab/c/")
x: org.apache.hadoop.fs.Path = /ab/c

scala> val y = new Path("/ab/c")
y: org.apache.hadoop.fs.Path = /ab/c

scala> x == y
res0: Boolean = true

Path with a URI type constructor will be not equal when one has a /, and another does not.

scala> val x =new URI("/a/b/c/")
x: java.net.URI = /a/b/c/

scala> val y =new URI("/a/b/c")
y: java.net.URI = /a/b/c

scala> x == y
res1: Boolean = false

scala> val x1 =new Path(x)
x1: org.apache.hadoop.fs.Path = /a/b/c/

scala> val y1 =new Path(y)
y1: org.apache.hadoop.fs.Path = /a/b/c

scala> x1 == y1
res2: Boolean = false

So just the Path with String type can ignore the suffix /, then if we have a URI in hand, and we want to compare with another URI, we should first transform them to String , and use this String to constructor a Path, after this two actions, we can compare them with ignore the suffix /.

But I think it is more complicate, here we normalize the URI with stripSuffix from the Orignal then compare two URI directly, it is more simple.

should we must to convert it to Path to compare?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok if we always compare URI with Path, instead of converting it to string.


def newDb(name: String): CatalogDatabase = {
CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
Expand All @@ -895,7 +897,7 @@ abstract class CatalogTestUtils {
CatalogTable(
identifier = TableIdentifier(name, database),
tableType = CatalogTableType.EXTERNAL,
storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().getAbsolutePath)),
storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)),
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
"LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " +
"you can only specify one of them.", ctx)
}
val customLocation = storage.locationUri.orElse(location)
val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI(_)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: xx.map(CatalogUtils.stringToURI)


Copy link
Member

@HyukjinKwon HyukjinKwon Apr 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this has a double-de/encoding problem when the input path is an URI.

scala> new org.apache.hadoop.fs.Path(new java.io.File("a b").toURI.toString).toUri.toString
res1: String = file:/.../a%2520b

A space character in URI is encoded as %20 and % character is encoded as %25. It seems a URI has a %20 and then is url-encoded from %20 to %2520.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it seems we have a similar util with CatalogUtils.stringToURI in org.apache.spark.util.Utils.resolveURI. Could we consolidate them? I did not replace it when I identified this because some existing tests in org.apache.spark.util.UtilsSuite were broken but I guess it would be fine if there is a coherent reason. These broken cases might be bugs.

@windpiger could you double check my comments and open a followup if I was not wrong?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not replace it when I identified this because some existing tests in org.apache.spark.util.UtilsSuite were broken but I guess it would be fine if there is a coherent reason. These broken cases might be bugs.

FYI.. cc @sarutak

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you reproduce it as a bug, please submit a PR to fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how could the input path be a URI string?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.. is the input path always expected to be a path? I thought we support both URI and path forms.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ambiguous to support both, what if users do wanna create a path /tmp/%25?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to my knowledge, HDFS's fully qualified path is a URI form. If we do not support this, that virtually means we are going to disallow the fully qualified path. I understand it sounds ambiguous but disallowing does not look a good solution.

Also, if users might want to access to local files or S3 when default scheme is hdfs, I guess it requires changing the default scheme every time.

Copy link
Member

@HyukjinKwon HyukjinKwon Apr 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, I guess we already have a lot of tests with URI input paths and I did many of them to pass the tests on Windows, which I guess implicitly some committers do not disagree with this.

IMHO, I guess URIs should be supported first correctly because those local path form is abbreviation of the fully qualified path.

Copy link
Member

@sarutak sarutak Apr 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how could the input path be a URI string?

How about external tables on S3?

val tableType = if (customLocation.isDefined) {
CatalogTableType.EXTERNAL
Expand Down Expand Up @@ -1080,8 +1080,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (external && location.isEmpty) {
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx)
}

val locUri = location.map(CatalogUtils.stringToURI(_))
val storage = CatalogStorageFormat(
locationUri = location,
locationUri = locUri,
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
Expand Down Expand Up @@ -1132,7 +1134,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
// are empty Maps.
val newTableDesc = tableDesc.copy(
storage = CatalogStorageFormat.empty.copy(locationUri = location),
storage = CatalogStorageFormat.empty.copy(locationUri = locUri),
provider = Some(conf.defaultDataSourceName))
CreateTable(newTableDesc, mode, Some(q))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.sql.execution.command

import java.net.URI

import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -54,7 +58,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo

// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val pathOption = table.storage.locationUri.map("path" -> _)
val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
// Fill in some default table options from the session conf
val tableWithDefaultOptions = table.copy(
identifier = table.identifier.copy(
Expand Down Expand Up @@ -175,12 +179,12 @@ case class CreateDataSourceTableAsSelectCommand(
private def saveDataIntoTable(
session: SparkSession,
table: CatalogTable,
tableLocation: Option[String],
tableLocation: Option[URI],
data: LogicalPlan,
mode: SaveMode,
tableExists: Boolean): BaseRelation = {
// Create the relation based on the input logical plan: `data`.
val pathOption = tableLocation.map("path" -> _)
val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
val dataSource = DataSource(
session,
className = table.provider.get,
Expand Down
Loading