From aeb10d100a24ca644745fb8b26985b584fd5118e Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Tue, 28 Feb 2017 23:29:17 +0800 Subject: [PATCH 1/5] Add support for `fileNameOnly` --- .../streaming/FileStreamOptions.scala | 25 ++++++++++++------- .../streaming/FileStreamSource.scala | 18 ++++++++++--- .../sql/streaming/FileStreamSourceSuite.scala | 22 ++++++++++++++-- 3 files changed, 50 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 2f802d782f5ad..86a425455e30f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -58,13 +58,20 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging * Whether to scan latest files first. If it's true, when the source finds unprocessed files in a * trigger, it will first process the latest files. */ - val latestFirst: Boolean = parameters.get("latestFirst").map { str => - try { - str.toBoolean - } catch { - case _: IllegalArgumentException => - throw new IllegalArgumentException( - s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'") - } - }.getOrElse(false) + val latestFirst: Boolean = withBooleanParameter("latestFirst", false) + + /** Whether to check new files based on only the filename instead of on the full path. */ + val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + + private def withBooleanParameter(name: String, default: Boolean) = { + parameters.get(name).map { str => + try { + str.toBoolean + } catch { + case _: IllegalArgumentException => + throw new IllegalArgumentException( + s"Invalid value '$str' for option '${name}', must be 'true' or 'false'") + } + }.getOrElse(default) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 6a7263ca45d85..b78084a056408 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming +import java.net.URI + import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{FileStatus, Path} @@ -75,7 +77,7 @@ class FileStreamSource( /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. - val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) + val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs, sourceOptions.fileNameOnly) metadataLog.allFiles().foreach { entry => seenFiles.add(entry.path, entry.timestamp) @@ -262,7 +264,7 @@ object FileStreamSource { * To prevent the hash map from growing indefinitely, a purge function is available to * remove files "maxAgeMs" older than the latest file. */ - class SeenFilesMap(maxAgeMs: Long) { + class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean) { require(maxAgeMs >= 0) /** Mapping from file to its timestamp. */ @@ -274,9 +276,13 @@ object FileStreamSource { /** Timestamp for the last purge operation. */ private var lastPurgeTimestamp: Timestamp = 0L + @inline private def stripPathIfNecessary(path: String) = { + if (fileNameOnly) new Path(new URI(path)).getName else path + } + /** Add a new file to the map. */ def add(path: String, timestamp: Timestamp): Unit = { - map.put(path, timestamp) + map.put(stripPathIfNecessary(path), timestamp) if (timestamp > latestTimestamp) { latestTimestamp = timestamp } @@ -289,7 +295,7 @@ object FileStreamSource { def isNewFile(path: String, timestamp: Timestamp): Boolean = { // Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that // is older than (latestTimestamp - maxAgeMs) but has not been purged yet. - timestamp >= lastPurgeTimestamp && !map.containsKey(path) + timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path)) } /** Removes aged entries and returns the number of files removed. */ @@ -309,6 +315,10 @@ object FileStreamSource { def size: Int = map.size() + /** + * Note when `fileNameOnly` is true, each entry would be (file name, timestamp) rather than + * (full path, timestamp). + */ def allEntries: Seq[(String, Timestamp)] = { map.asScala.toSeq } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 1586850c77fca..4f4bf7cf35ee9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1220,7 +1220,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } test("SeenFilesMap") { - val map = new SeenFilesMap(maxAgeMs = 10) + val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false) map.add("a", 5) assert(map.size == 1) @@ -1253,8 +1253,26 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(map.isNewFile("e", 20)) } + test("SeenFilesMap with fileNameOnly = true") { + val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true) + + map.add("file:///a/b/c/d", 5) + map.add("file:///a/b/c/e", 5) + assert(map.size == 2) + + assert(!map.isNewFile("d", 5)) + assert(!map.isNewFile("file:///d", 5)) + assert(!map.isNewFile("file:///x/d", 5)) + assert(!map.isNewFile("file:///x/y/d", 5)) + + map.add("s3:///bucket/d", 5) + map.add("s3n:///bucket/d", 5) + map.add("s3a:///bucket/d", 5) + assert(map.size == 2) + } + test("SeenFilesMap should only consider a file old if it is earlier than last purge time") { - val map = new SeenFilesMap(maxAgeMs = 10) + val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false) map.add("a", 20) assert(map.size == 1) From 2354ae69f8906e654a6881fb87b7aabfe008885e Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 4 Mar 2017 10:09:09 +0800 Subject: [PATCH 2/5] Explicit docs about the expectations of the filename --- docs/structured-streaming-programming-guide.md | 12 ++++++++++-- .../sql/execution/streaming/FileStreamOptions.scala | 11 ++++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 6af47b6efba2c..995ac77a4fb3b 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1052,10 +1052,18 @@ Here are the details of all the sinks in Spark. Append path: path to the output directory, must be specified. +
maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max)
- latestFirst: whether to processs the latest new files first, useful when there is a large backlog of files(default: false) -

+ latestFirst: whether to processs the latest new files first, useful when there is a large backlog of files (default: false) +
+ fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same: +
+ · "file:///dataset.txt"
+ · "s3://a/dataset.txt"
+ · "s3n://a/b/dataset.txt"
+ · "s3a://a/b/c/dataset.txt"
+
For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python). E.g. for "parquet" format options see DataFrameWriter.parquet() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 86a425455e30f..3d73f384e22fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -60,7 +60,16 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val latestFirst: Boolean = withBooleanParameter("latestFirst", false) - /** Whether to check new files based on only the filename instead of on the full path. */ + /** + * Whether to check new files based on only the filename instead of on the full path. + * + * With this set to `true`, the following files would be considered as the same file, because + * their filenames, "dataset.txt", are the same: + * - "file:///dataset.txt" + * - "s3://a/dataset.txt" + * - "s3n://a/b/dataset.txt" + * - "s3a://a/b/c/dataset.txt" + */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) private def withBooleanParameter(name: String, default: Boolean) = { From f9e525e54de1ceb98fa3e21c92969559c65a24fd Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 4 Mar 2017 21:07:34 +0800 Subject: [PATCH 3/5] Address comments from Steve --- .../apache/spark/sql/streaming/FileStreamSourceSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 4f4bf7cf35ee9..f79b96c4a60fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1258,7 +1258,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { map.add("file:///a/b/c/d", 5) map.add("file:///a/b/c/e", 5) - assert(map.size == 2) + assert(map.size === 2) assert(!map.isNewFile("d", 5)) assert(!map.isNewFile("file:///d", 5)) @@ -1268,7 +1268,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { map.add("s3:///bucket/d", 5) map.add("s3n:///bucket/d", 5) map.add("s3a:///bucket/d", 5) - assert(map.size == 2) + assert(map.size === 2) } test("SeenFilesMap should only consider a file old if it is earlier than last purge time") { From 7da2a9cccd6c18e1c0eda542977aec00eca33459 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Thu, 9 Mar 2017 11:30:28 +0800 Subject: [PATCH 4/5] Address @zsxwing's comments --- .../execution/streaming/FileStreamOptions.scala | 2 +- .../execution/streaming/FileStreamSource.scala | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 8b0b31a8e4039..d54ed44b43bf1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -82,7 +82,7 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging } catch { case _: IllegalArgumentException => throw new IllegalArgumentException( - s"Invalid value '$str' for option '${name}', must be 'true' or 'false'") + s"Invalid value '$str' for option '$name', must be 'true' or 'false'") } }.getOrElse(default) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index cd14e0a268f9e..214151c5fa29b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -81,9 +81,16 @@ class FileStreamSource( sourceOptions.maxFileAgeMs } + private val fileNameOnly = sourceOptions.fileNameOnly + if (fileNameOnly) { + logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " + + "UUID), otherwise, files using the same name will be considered as the same file and causes" + + " data lost") + } + /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. - val seenFiles = new SeenFilesMap(maxFileAgeMs, sourceOptions.fileNameOnly) + val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly) metadataLog.allFiles().foreach { entry => seenFiles.add(entry.path, entry.timestamp) @@ -320,13 +327,5 @@ object FileStreamSource { } def size: Int = map.size() - - /** - * Note when `fileNameOnly` is true, each entry would be (file name, timestamp) rather than - * (full path, timestamp). - */ - def allEntries: Seq[(String, Timestamp)] = { - map.asScala.toSeq - } } } From aab75540b314abcd1b4fe557bb7a4aa2b7bdc072 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Thu, 9 Mar 2017 14:58:54 +0800 Subject: [PATCH 5/5] Adjust wording as per Felix's comments --- .../spark/sql/execution/streaming/FileStreamSource.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 214151c5fa29b..411a15ffceb6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -84,8 +84,8 @@ class FileStreamSource( private val fileNameOnly = sourceOptions.fileNameOnly if (fileNameOnly) { logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " + - "UUID), otherwise, files using the same name will be considered as the same file and causes" + - " data lost") + "UUID), otherwise, files with the same name but under different paths will be considered " + + "the same and causes data lost.") } /** A mapping from a file that we have processed to some timestamp it was last modified. */