-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-18703] [SQL] Drop Staging Directories and Data Files After each Insertion/CTAS of Hive serde Tables #16134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8773c03
34340ee
365ea26
1ed228f
74a676c
16c9da3
5aca883
9da3951
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,8 @@ import java.net.URI | |
| import java.text.SimpleDateFormat | ||
| import java.util.{Date, Locale, Random} | ||
|
|
||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.hadoop.fs.{FileSystem, Path} | ||
| import org.apache.hadoop.hive.common.FileUtils | ||
| import org.apache.hadoop.hive.ql.exec.TaskRunner | ||
|
|
@@ -84,6 +86,7 @@ case class InsertIntoHiveTable( | |
| def output: Seq[Attribute] = Seq.empty | ||
|
|
||
| val hadoopConf = sessionState.newHadoopConf() | ||
| var createdTempDir: Option[Path] = None | ||
| val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") | ||
| val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") | ||
|
|
||
|
|
@@ -111,12 +114,12 @@ case class InsertIntoHiveTable( | |
| if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { | ||
| throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") | ||
| } | ||
| createdTempDir = Some(dir) | ||
| fs.deleteOnExit(dir) | ||
| } catch { | ||
| case e: IOException => | ||
| throw new RuntimeException( | ||
| "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) | ||
|
|
||
| } | ||
| return dir | ||
| } | ||
|
|
@@ -163,11 +166,11 @@ case class InsertIntoHiveTable( | |
| if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { | ||
| throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) | ||
| } | ||
| createdTempDir = Some(dirPath) | ||
| fs.deleteOnExit(dirPath) | ||
| } catch { | ||
| case e: IOException => | ||
| throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e) | ||
|
|
||
| } | ||
| dirPath | ||
| } | ||
|
|
@@ -378,6 +381,15 @@ case class InsertIntoHiveTable( | |
| isSrcLocal = false) | ||
| } | ||
|
|
||
| // Attempt to delete the staging directory and the inclusive files. If failed, the files are | ||
| // expected to be dropped at the normal termination of VM since deleteOnExit is used. | ||
| try { | ||
| createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) } | ||
| } catch { | ||
| case NonFatal(e) => | ||
| logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) | ||
| } | ||
|
|
||
| // Invalidate the cache. | ||
| sqlContext.sharedState.cacheManager.invalidateCache(table) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we delete the staging files before or after the invalidateCache? does it matter? logically, we should invalid cache first, then remove the intermediate dataset s.t the cache can be recovered from the file from disks. am i right? please clarify? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this does not matter. We do not reuse the temporary files in our implementation. |
||
| sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we just delete the
tempLocationdefined in https://github.com/apache/spark/pull/16134/files#diff-d579db9a8f27e0bbef37720ab14ec3f6R179?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Through the call of getExternalTmpPath,
tmpLocationis pointing to the child directory of the staging directory. For example,/x/y/z/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000Here, we want to drop the whole staging directory. For example,
/x/y/z/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1At the initial fix, I used
tmpLocationand then callgetParentto get the parent directory. Later, I am afriad it might look tricky. Thus, I used the solution here.