diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2eaeab1d807b4..16797615bfd50 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1457,6 +1457,29 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } + /** + * Delete a file based on the given path. This file should have been added by prior call + * 'ADD FILE' command. + */ + def deleteFile(path: String): Unit = { + val uri = new URI(path) + val schemeCorrectedPath = uri.getScheme match { + case null | "local" => new File(path).getCanonicalFile.toURI.toString + case _ => path + } + val scheme = new URI(schemeCorrectedPath).getScheme + val fileName = new File(uri.getPath) + val key = if (!isLocal && scheme == "file") { + env.rpcEnv.fileServer.deleteFile(fileName.getName()) + } else { + schemeCorrectedPath + } + addedFiles.remove(key) + val timestamp = System.currentTimeMillis + logInfo("Deleted file " + path + " at " + key + " with timestamp " + timestamp) + postEnvironmentUpdate() + } + /** * :: DeveloperApi :: * Register a listener to receive up-calls from events that happen during execution. diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index 56683771335a6..49d090659559c 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -154,6 +154,14 @@ private[spark] trait RpcEnvFileServer { */ def addFile(file: File): String + /** + * Deletes a file to be served by this RpcEnv. + * + * @param file Local file to serve. + * @return A URI for the location of the file. + */ + def deleteFile(file: String): String + /** * Adds a jar to be served by this RpcEnv. Similar to `addFile` but for jars added using * `SparkContext.addJar`. diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index 780fadd5bda8e..e5e4f0c6fe6dc 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -73,6 +73,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}" } + override def deleteFile(file: String): String = { + files.remove(file) + s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file)}" + } + override def addJar(file: File): String = { val existingPath = jars.putIfAbsent(file.getName, file) require(existingPath == null || existingPath == file, diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index f8d143dc610cb..fd1c6ef491661 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -108,7 +108,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { assert(byteArray2.length === 0) } - test("basic case for addFile and listFiles") { + test("basic case for addFile, deleteFile and listFiles") { val dir = Utils.createTempDir() val file1 = File.createTempFile("someprefix1", "somesuffix1", dir) @@ -157,6 +157,10 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { x }).count() assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1) + sc.deleteFile(file1.getAbsolutePath) + assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 0) + sc.deleteFile(relativePath) + assert(sc.listFiles().filter(_.contains("somesuffix2")).size == 0) } finally { sc.stop() } diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index cab7c3ff5a8f7..6b76ccc066f87 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -123,7 +123,7 @@ statement tableIdentifier partitionSpec? #loadData | TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable | MSCK REPAIR TABLE tableIdentifier #repairTable - | op=(ADD | LIST) identifier .*? #manageResource + | op=(ADD | DELETE | LIST) identifier .*? #manageResource | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration | RESET #resetConfiguration diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 71c3bd31e02e4..f71a71de74390 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -902,6 +902,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { case "jar" => AddJarCommand(mayebePaths) case other => operationNotAllowed(s"ADD with resource type '$other'", ctx) } + case SqlBaseParser.DELETE => + ctx.identifier.getText.toLowerCase match { + case "file" => DeleteFileCommand(mayebePaths) + case other => throw operationNotAllowed (s"DELETE with resource type '$other'", ctx) + } case SqlBaseParser.LIST => ctx.identifier.getText.toLowerCase match { case "files" | "file" => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala index 20b08946675d0..cfdd13306d5a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala @@ -99,3 +99,13 @@ case class ListJarsCommand(jars: Seq[String] = Seq.empty[String]) extends Runnab } } } + +/** + * Deletes a file to the current session. + */ +case class DeleteFileCommand(path: String) extends RunnableCommand { + override def run(sparkSession: SparkSession): Seq[Row] = { + sparkSession.sparkContext.deleteFile(path) + Seq.empty[Row] + } +} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index d3cec11bd7567..c32b8e6c979f2 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -283,4 +283,15 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { "SET conf3;" -> "conftest" ) } + + test("delete files") { + val dataFilePath = Thread.currentThread(). + getContextClassLoader.getResource("data/files/small_kv.txt") + runCliWithin(2.minute)( + s"ADD FILE $dataFilePath;" -> "", + s"LIST FILES;" -> "small_kv.txt", + s"DELETE FILE $dataFilePath;" -> "", + s"LIST FILES;" -> "" + ) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 6785167d3dfba..3f309fdd70f96 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -911,6 +911,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { assert(sql("list file"). filter(_.getString(0).contains("data/files/v1.txt")).count() > 0) assert(sql(s"list file $testFile").count() == 1) + + sql(s"DELETE FILE $testFile") + assert(sql("list files").filter(_.getString(0).contains("data/files/v1.txt")).count() == 0) + assert(sql("list file").filter(_.getString(0).contains("data/files/v1.txt")).count() == 0) + assert(sql(s"list file $testFile").count() == 0) } createQueryTest("dynamic_partition",