Skip to content

Commit 410b6f0

Browse files
committed
CTAS for hive serde table should work for all hive versions
1 parent 88f559f commit 410b6f0

File tree

2 files changed

+67
-12
lines changed

2 files changed

+67
-12
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.net.URI
2222
import java.text.SimpleDateFormat
2323
import java.util.{Date, Locale, Random}
2424

25-
import org.apache.hadoop.conf.Configuration
2625
import org.apache.hadoop.fs.{FileSystem, Path}
2726
import org.apache.hadoop.hive.common.FileUtils
2827
import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -86,14 +85,15 @@ case class InsertIntoHiveTable(
8685

8786
val hadoopConf = sessionState.newHadoopConf()
8887
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
88+
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
8989

9090
private def executionId: String = {
9191
val rand: Random = new Random
9292
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
9393
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
9494
}
9595

96-
private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
96+
private def getStagingDir(inputPath: Path): Path = {
9797
val inputPathUri: URI = inputPath.toUri
9898
val inputPathName: String = inputPathUri.getPath
9999
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
@@ -121,21 +121,61 @@ case class InsertIntoHiveTable(
121121
return dir
122122
}
123123

124-
private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
125-
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
124+
private def getExternalScratchDir(extURI: URI): Path = {
125+
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
126126
}
127127

128-
def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
128+
def getExternalTmpPath(path: Path): Path = {
129+
val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version.fullVersion
130+
if (hiveVersion.startsWith("0.12") ||
131+
hiveVersion.startsWith("0.13") ||
132+
hiveVersion.startsWith("0.14") ||
133+
hiveVersion.startsWith("1.0")) {
134+
oldStyleExternalTempPath(path)
135+
} else if (hiveVersion.startsWith("1.1") || hiveVersion.startsWith("1.2")) {
136+
newStyleExternalTempPath(path)
137+
} else {
138+
throw new IllegalStateException("Unsupported hive version: " + hiveVersion)
139+
}
140+
}
141+
142+
// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
143+
def oldStyleExternalTempPath(path: Path): Path = {
144+
val extURI: URI = path.toUri
145+
val scratchPath = new Path(scratchDir, executionId)
146+
var dirPath = new Path(
147+
extURI.getScheme,
148+
extURI.getAuthority,
149+
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
150+
151+
try {
152+
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
153+
dirPath = new Path(fs.makeQualified(dirPath).toString())
154+
155+
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
156+
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
157+
}
158+
fs.deleteOnExit(dirPath)
159+
} catch {
160+
case e: IOException =>
161+
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
162+
163+
}
164+
dirPath
165+
}
166+
167+
// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
168+
def newStyleExternalTempPath(path: Path): Path = {
129169
val extURI: URI = path.toUri
130170
if (extURI.getScheme == "viewfs") {
131-
getExtTmpPathRelTo(path.getParent, hadoopConf)
171+
getExtTmpPathRelTo(path.getParent)
132172
} else {
133-
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
173+
new Path(getExternalScratchDir(extURI), "-ext-10000")
134174
}
135175
}
136176

137-
def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
138-
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
177+
def getExtTmpPathRelTo(path: Path): Path = {
178+
new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
139179
}
140180

141181
private def saveAsHiveFile(
@@ -172,7 +212,7 @@ case class InsertIntoHiveTable(
172212
// instances within the closure, since Serializer is not serializable while TableDesc is.
173213
val tableDesc = table.tableDesc
174214
val tableLocation = table.hiveQlTable.getDataLocation
175-
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
215+
val tmpLocation = getExternalTmpPath(tableLocation)
176216
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
177217
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
178218

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ import org.apache.hadoop.mapred.TextInputFormat
2626

2727
import org.apache.spark.SparkFunSuite
2828
import org.apache.spark.internal.Logging
29-
import org.apache.spark.sql.AnalysisException
29+
import org.apache.spark.sql.{AnalysisException, Row}
3030
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
3131
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
3232
import org.apache.spark.sql.catalyst.catalog._
3333
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
3434
import org.apache.spark.sql.catalyst.util.quietly
3535
import org.apache.spark.sql.hive.HiveUtils
36+
import org.apache.spark.sql.hive.test.TestHiveSingleton
37+
import org.apache.spark.sql.test.SQLTestUtils
3638
import org.apache.spark.sql.types.IntegerType
3739
import org.apache.spark.sql.types.StructType
3840
import org.apache.spark.tags.ExtendedHiveTest
@@ -45,7 +47,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
4547
* is not fully tested.
4648
*/
4749
@ExtendedHiveTest
48-
class VersionsSuite extends SparkFunSuite with Logging {
50+
class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
4951

5052
private val clientBuilder = new HiveClientBuilder
5153
import clientBuilder.buildClient
@@ -525,5 +527,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
525527
client.reset()
526528
assert(client.listTables("default").isEmpty)
527529
}
530+
531+
///////////////////////////////////////////////////////////////////////////
532+
// End-To-End tests
533+
///////////////////////////////////////////////////////////////////////////
534+
535+
test(s"$version: CREATE TABLE AS SELECT") {
536+
withTable("tbl") {
537+
spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
538+
assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
539+
}
540+
}
541+
542+
// TODO: add more tests.
528543
}
529544
}

0 commit comments

Comments
 (0)