diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 82c7b1a3c6b81..aa858e808edf7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -22,6 +22,8 @@ import java.net.URI import java.text.SimpleDateFormat import java.util.{Date, Locale, Random} +import scala.util.control.NonFatal + import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner @@ -84,6 +86,7 @@ case class InsertIntoHiveTable( def output: Seq[Attribute] = Seq.empty val hadoopConf = sessionState.newHadoopConf() + var createdTempDir: Option[Path] = None val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") @@ -111,12 +114,12 @@ case class InsertIntoHiveTable( if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") } + createdTempDir = Some(dir) fs.deleteOnExit(dir) } catch { case e: IOException => throw new RuntimeException( "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) - } return dir } @@ -163,11 +166,11 @@ case class InsertIntoHiveTable( if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) } + createdTempDir = Some(dirPath) fs.deleteOnExit(dirPath) } catch { case e: IOException => throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e) - } dirPath } @@ -378,6 +381,15 @@ case class InsertIntoHiveTable( isSrcLocal = false) } + // Attempt to delete the staging directory and the inclusive files. If failed, the files are + // expected to be dropped at the normal termination of VM since deleteOnExit is used. + try { + createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) } + } catch { + case NonFatal(e) => + logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) + } + // Invalidate the cache. sqlContext.sharedState.cacheManager.invalidateCache(table) sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 9b26383a162dd..8dd06998ba3c8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -546,6 +546,30 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet } } + test(s"$version: Delete the temporary staging directory and files after each insert") { + withTempDir { tmpDir => + withTable("tab") { + spark.sql( + s""" + |CREATE TABLE tab(c1 string) + |location '${tmpDir.toURI.toString}' + """.stripMargin) + + (1 to 3).map { i => + spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'") + } + def listFiles(path: File): List[String] = { + val dir = path.listFiles() + val folders = dir.filter(_.isDirectory).toList + val filePaths = dir.map(_.getName).toList + folders.flatMap(listFiles) ++: filePaths + } + val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil + assert(listFiles(tmpDir).sorted == expectedFiles) + } + } + } + // TODO: add more tests. } }