Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3b44c59
adding testcase
kevinyu98 Apr 20, 2016
18b4a31
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 22, 2016
4f4d1c8
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 23, 2016
f5f0cbe
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 23, 2016
d8b2edb
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 25, 2016
196b6c6
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 25, 2016
f37a01e
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 27, 2016
bb5b01f
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 30, 2016
bde5820
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 4, 2016
5f7cd96
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 10, 2016
893a49a
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 13, 2016
4bbe1fd
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 17, 2016
b2dd795
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 18, 2016
8c3e5da
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 18, 2016
a0eaa40
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 19, 2016
d03c940
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 19, 2016
d728d5e
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 24, 2016
ea104dd
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 25, 2016
6ab1215
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 27, 2016
0c56653
Merge remote-tracking branch 'upstream/master'
kevinyu98 Jun 1, 2016
d7a1874
Merge remote-tracking branch 'upstream/master'
kevinyu98 Jun 1, 2016
85d3500
Merge remote-tracking branch 'upstream/master'
kevinyu98 Jun 2, 2016
c056f91
Merge remote-tracking branch 'upstream/master'
kevinyu98 Jun 3, 2016
6dd6ca9
fix7
kevinyu98 Jun 3, 2016
0b8189d
Merge remote-tracking branch 'upstream/master'
kevinyu98 Jun 3, 2016
527749d
Merge branch 'spark-deletefile' into spark-15763
kevinyu98 Jun 3, 2016
c2ea31d
Merge remote-tracking branch 'upstream/master'
kevinyu98 Jun 6, 2016
8767570
fix comments for deleteFile
kevinyu98 Jun 6, 2016
a2d3056
Merge remote-tracking branch 'upstream/master'
kevinyu98 Jun 8, 2016
39e5648
Merge remote-tracking branch 'upstream/master'
kevinyu98 Jun 8, 2016
b9370a3
Merge remote-tracking branch 'upstream/master'
kevinyu98 Jul 25, 2016
01224a4
Merge remote-tracking branch 'upstream/master'
kevinyu98 Aug 3, 2016
d05d39a
Merge remote-tracking branch 'upstream/master'
kevinyu98 Aug 19, 2016
d38a412
Merge branch 'spark-15763' into spark-15763new
kevinyu98 Aug 19, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,29 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

/**
* Delete a file based on the given path. This file should have been added by prior call
* 'ADD FILE' command.
*/
def deleteFile(path: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fairly confusing -- i'd assume this is actually deleting the path given.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Reynold: Thanks very much for reviewing the code.
yes, it is deleting the path from the addedFile hashmap, the path will be generated as key and stored in the map.
The addFile use this logical to generate the key and stored in the hashmap, so in order to find the same key, I have to use the same logical to generate the key.
For example:
for this local file, the addFile will generate a 'file' in front of the path.

spark.sql("add file /Users/qianyangyu/myfile.txt")

scala> spark.sql("list file").show(false)
+----------------------------------+
|Results |
+----------------------------------+
|file:/Users/qianyangyu/myfile2.txt|
|file:/Users/qianyangyu/myfile.txt |
+----------------------------------+

but for the remote location file, it will just take the path.

scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt")
res17: org.apache.spark.sql.DataFrame = []

scala> spark.sql("list file").show(false)
+---------------------------------------------+
|Results |
+---------------------------------------------+
|file:/Users/qianyangyu/myfile.txt |
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt|
+---------------------------------------------+

if the command is issued from the worker node and add local file, the path will be added into the NettyStreamManager's hashmap and using that environment's path as key to store in the addedFiles.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the deleteFile comments to make it more clear. Thanks for reviewing.

val uri = new URI(path)
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case _ => path
}
val scheme = new URI(schemeCorrectedPath).getScheme
val fileName = new File(uri.getPath)
val key = if (!isLocal && scheme == "file") {
env.rpcEnv.fileServer.deleteFile(fileName.getName())
} else {
schemeCorrectedPath
}
addedFiles.remove(key)
val timestamp = System.currentTimeMillis
logInfo("Deleted file " + path + " at " + key + " with timestamp " + timestamp)
postEnvironmentUpdate()
}

/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ private[spark] trait RpcEnvFileServer {
*/
def addFile(file: File): String

/**
* Deletes a file to be served by this RpcEnv.
*
* @param file Local file to serve.
* @return A URI for the location of the file.
*/
def deleteFile(file: String): String

/**
* Adds a jar to be served by this RpcEnv. Similar to `addFile` but for jars added using
* `SparkContext.addJar`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}

override def deleteFile(file: String): String = {
files.remove(file)
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file)}"
}

override def addJar(file: File): String = {
val existingPath = jars.putIfAbsent(file.getName, file)
require(existingPath == null || existingPath == file,
Expand Down
6 changes: 5 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
assert(byteArray2.length === 0)
}

test("basic case for addFile and listFiles") {
test("basic case for addFile, deleteFile and listFiles") {
val dir = Utils.createTempDir()

val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
Expand Down Expand Up @@ -157,6 +157,10 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
x
}).count()
assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1)
sc.deleteFile(file1.getAbsolutePath)
assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 0)
sc.deleteFile(relativePath)
assert(sc.listFiles().filter(_.contains("somesuffix2")).size == 0)
} finally {
sc.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ statement
tableIdentifier partitionSpec? #loadData
| TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable
| MSCK REPAIR TABLE tableIdentifier #repairTable
| op=(ADD | LIST) identifier .*? #manageResource
| op=(ADD | DELETE | LIST) identifier .*? #manageResource
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
| RESET #resetConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
case "jar" => AddJarCommand(mayebePaths)
case other => operationNotAllowed(s"ADD with resource type '$other'", ctx)
}
case SqlBaseParser.DELETE =>
ctx.identifier.getText.toLowerCase match {
case "file" => DeleteFileCommand(mayebePaths)
case other => throw operationNotAllowed (s"DELETE with resource type '$other'", ctx)
}
case SqlBaseParser.LIST =>
ctx.identifier.getText.toLowerCase match {
case "files" | "file" =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,13 @@ case class ListJarsCommand(jars: Seq[String] = Seq.empty[String]) extends Runnab
}
}
}

/**
* Deletes a file to the current session.
*/
case class DeleteFileCommand(path: String) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sparkContext.deleteFile(path)
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,15 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
"SET conf3;" -> "conftest"
)
}

test("delete files") {
val dataFilePath = Thread.currentThread().
getContextClassLoader.getResource("data/files/small_kv.txt")
runCliWithin(2.minute)(
s"ADD FILE $dataFilePath;" -> "",
s"LIST FILES;" -> "small_kv.txt",
s"DELETE FILE $dataFilePath;" -> "",
s"LIST FILES;" -> ""
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assert(sql("list file").
filter(_.getString(0).contains("data/files/v1.txt")).count() > 0)
assert(sql(s"list file $testFile").count() == 1)

sql(s"DELETE FILE $testFile")
assert(sql("list files").filter(_.getString(0).contains("data/files/v1.txt")).count() == 0)
assert(sql("list file").filter(_.getString(0).contains("data/files/v1.txt")).count() == 0)
assert(sql(s"list file $testFile").count() == 0)
}

createQueryTest("dynamic_partition",
Expand Down