Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}

import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
Expand Down Expand Up @@ -84,6 +86,7 @@ case class InsertIntoHiveTable(
def output: Seq[Attribute] = Seq.empty

val hadoopConf = sessionState.newHadoopConf()
var createdTempDir: Option[Path] = None
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")

Expand Down Expand Up @@ -111,12 +114,12 @@ case class InsertIntoHiveTable(
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
createdTempDir = Some(dir)
fs.deleteOnExit(dir)
} catch {
case e: IOException =>
throw new RuntimeException(
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)

}
return dir
}
Expand Down Expand Up @@ -163,11 +166,11 @@ case class InsertIntoHiveTable(
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
}
createdTempDir = Some(dirPath)
fs.deleteOnExit(dirPath)
} catch {
case e: IOException =>
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)

}
dirPath
}
Expand Down Expand Up @@ -378,6 +381,15 @@ case class InsertIntoHiveTable(
isSrcLocal = false)
}

// Attempt to delete the staging directory and the inclusive files. If failed, the files are
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
try {
createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Through the call of getExternalTmpPath, tmpLocation is pointing to the child directory of the staging directory. For example,
/x/y/z/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000

Here, we want to drop the whole staging directory. For example, /x/y/z/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1

At the initial fix, I used tmpLocation and then call getParent to get the parent directory. Later, I am afriad it might look tricky. Thus, I used the solution here.

} catch {
case NonFatal(e) =>
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
}

// Invalidate the cache.
sqlContext.sharedState.cacheManager.invalidateCache(table)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we delete the staging files before or after the invalidateCache? does it matter? logically, we should invalid cache first, then remove the intermediate dataset s.t the cache can be recovered from the file from disks. am i right? please clarify?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this does not matter. We do not reuse the temporary files in our implementation.

sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,30 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet
}
}

test(s"$version: Delete the temporary staging directory and files after each insert") {
withTempDir { tmpDir =>
withTable("tab") {
spark.sql(
s"""
|CREATE TABLE tab(c1 string)
|location '${tmpDir.toURI.toString}'
""".stripMargin)

(1 to 3).map { i =>
spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
}
def listFiles(path: File): List[String] = {
val dir = path.listFiles()
val folders = dir.filter(_.isDirectory).toList
val filePaths = dir.map(_.getName).toList
folders.flatMap(listFiles) ++: filePaths
}
val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
assert(listFiles(tmpDir).sorted == expectedFiles)
}
}
}

// TODO: add more tests.
}
}