Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}

private def makeQualifiedPath(path: String): URI = {
// copy-paste from SessionCatalog
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(sparkContext.hadoopConfiguration)
fs.makeQualified(hadoopPath).toUri
}

test("Create Database using Default Warehouse Path") {
val catalog = spark.sessionState.catalog
val dbName = "db1"
Expand Down Expand Up @@ -2086,9 +2079,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
Seq(1).toDF("a").write.saveAsTable("t")
val tblloc = new File(loc, "t")
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val tblPath = new Path(tblloc.getAbsolutePath)
val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf())
assert(table.location == fs.makeQualified(tblPath).toUri)
assert(table.location == makeQualifiedPath(tblloc.getAbsolutePath))
assert(tblloc.listFiles().nonEmpty)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package org.apache.spark.sql.test

import java.io.File
import java.net.URI
import java.util.UUID

import scala.language.implicitConversions
import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkFunSuite
Expand Down Expand Up @@ -294,6 +295,17 @@ private[sql] trait SQLTestUtils
test(name) { runOnThread() }
}
}

/**
* This method is used to make the given path qualified, when a path
* does not contain a scheme, this path will not be changed after the default
* FileSystem is changed.
*/
def makeQualifiedPath(path: String): URI = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we can take File as parameter?

val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
fs.makeQualified(hadoopPath).toUri
}
}

private[sql] object SQLTestUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1654,10 +1654,8 @@ class HiveDDLSuite
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val dirPath = new Path(dir.getAbsolutePath)
val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(new Path(table.location) == fs.makeQualified(dirPath))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
Expand All @@ -1675,10 +1673,8 @@ class HiveDDLSuite
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val dirPath = new Path(dir.getAbsolutePath)
val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(new Path(table.location) == fs.makeQualified(dirPath))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

val partDir = new File(dir, "a=3")
assert(partDir.exists())
Expand Down Expand Up @@ -1792,9 +1788,7 @@ class HiveDDLSuite
""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val path = new Path(loc.getAbsolutePath)
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
assert(table.location == fs.makeQualified(path).toUri)
assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
assert(new Path(table.location).toString.contains(specialChars))

assert(loc.listFiles().isEmpty)
Expand Down Expand Up @@ -1822,9 +1816,7 @@ class HiveDDLSuite
""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
val path = new Path(loc.getAbsolutePath)
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
assert(table.location == fs.makeQualified(path).toUri)
assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
assert(new Path(table.location).toString.contains(specialChars))

assert(loc.listFiles().isEmpty)
Expand Down Expand Up @@ -1871,7 +1863,7 @@ class HiveDDLSuite
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val tblPath = new Path(tblloc.getAbsolutePath)
val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf())
assert(table.location == fs.makeQualified(tblPath).toUri)
assert(table.location == makeQualifiedPath(tblloc.getAbsolutePath))
assert(tblloc.listFiles().nonEmpty)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.spark.sql.hive.orc

import java.io.File

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.HadoopFsRelationTest
import org.apache.spark.sql.types._
Expand All @@ -42,12 +42,9 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {

test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
val basePath = new Path(file.getCanonicalPath)
val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
val qualifiedBasePath = fs.makeQualified(basePath)

for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
val partitionDir = new Path(
CatalogUtils.URIToString(makeQualifiedPath(file.getCanonicalPath)), s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
.toDF("a", "b", "p1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.math.BigDecimal

import org.apache.hadoop.fs.Path

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.types._

class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
Expand All @@ -38,12 +38,9 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {

test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
val basePath = new Path(file.getCanonicalPath)
val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
val qualifiedBasePath = fs.makeQualified(basePath)

for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
val partitionDir = new Path(
CatalogUtils.URIToString(makeQualifiedPath(file.getCanonicalPath)), s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield s"""{"a":$i,"b":"val_$i"}""")
.saveAsTextFile(partitionDir.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand All @@ -44,12 +44,9 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {

test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
val basePath = new Path(file.getCanonicalPath)
val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
val qualifiedBasePath = fs.makeQualified(basePath)

for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
val partitionDir = new Path(
CatalogUtils.URIToString(makeQualifiedPath(file.getCanonicalPath)), s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
.toDF("a", "b", "p1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.sources

import org.apache.hadoop.fs.Path

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.types._

Expand All @@ -45,12 +45,9 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat

test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
val basePath = new Path(file.getCanonicalPath)
val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
val qualifiedBasePath = fs.makeQualified(basePath)

for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
val partitionDir = new Path(
CatalogUtils.URIToString(makeQualifiedPath(file.getCanonicalPath)), s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
.saveAsTextFile(partitionDir.toString)
Expand Down