Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1052,10 +1052,18 @@ Here are the details of all the sinks in Spark.
<td>Append</td>
<td>
<code>path</code>: path to the output directory, must be specified.
<br/>
<code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max)
<br/>
<code>latestFirst</code>: whether to processs the latest new files first, useful when there is a large backlog of files(default: false)
<br/><br/>
<code>latestFirst</code>: whether to processs the latest new files first, useful when there is a large backlog of files (default: false)
<br/>
<code>fileNameOnly</code>: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
<br/>
· "file:///dataset.txt"<br/>
· "s3://a/dataset.txt"<br/>
· "s3n://a/b/dataset.txt"<br/>
· "s3a://a/b/c/dataset.txt"<br/>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the incidents of a <li> does not look pretty, so I'm using a dot here

<br/>
For file-format-specific options, see the related methods in DataFrameWriter
(<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>).
E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,29 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
* Whether to scan latest files first. If it's true, when the source finds unprocessed files in a
* trigger, it will first process the latest files.
*/
val latestFirst: Boolean = parameters.get("latestFirst").map { str =>
try {
str.toBoolean
} catch {
case _: IllegalArgumentException =>
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'")
}
}.getOrElse(false)
val latestFirst: Boolean = withBooleanParameter("latestFirst", false)

/**
* Whether to check new files based on only the filename instead of on the full path.
*
* With this set to `true`, the following files would be considered as the same file, because
* their filenames, "dataset.txt", are the same:
* - "file:///dataset.txt"
* - "s3://a/dataset.txt"
* - "s3n://a/b/dataset.txt"
* - "s3a://a/b/c/dataset.txt"
*/
val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)

private def withBooleanParameter(name: String, default: Boolean) = {
parameters.get(name).map { str =>
try {
str.toBoolean
} catch {
case _: IllegalArgumentException =>
throw new IllegalArgumentException(
s"Invalid value '$str' for option '$name', must be 'true' or 'false'")
}
}.getOrElse(default)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.streaming

import java.net.URI

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.{FileStatus, Path}
Expand Down Expand Up @@ -79,9 +81,16 @@ class FileStreamSource(
sourceOptions.maxFileAgeMs
}

private val fileNameOnly = sourceOptions.fileNameOnly
if (fileNameOnly) {
logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " +
"UUID), otherwise, files with the same name but under different paths will be considered " +
"the same and causes data lost.")
}

/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(maxFileAgeMs)
val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)

metadataLog.allFiles().foreach { entry =>
seenFiles.add(entry.path, entry.timestamp)
Expand Down Expand Up @@ -268,7 +277,7 @@ object FileStreamSource {
* To prevent the hash map from growing indefinitely, a purge function is available to
* remove files "maxAgeMs" older than the latest file.
*/
class SeenFilesMap(maxAgeMs: Long) {
class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean) {
require(maxAgeMs >= 0)

/** Mapping from file to its timestamp. */
Expand All @@ -280,9 +289,13 @@ object FileStreamSource {
/** Timestamp for the last purge operation. */
private var lastPurgeTimestamp: Timestamp = 0L

@inline private def stripPathIfNecessary(path: String) = {
if (fileNameOnly) new Path(new URI(path)).getName else path
}

/** Add a new file to the map. */
def add(path: String, timestamp: Timestamp): Unit = {
map.put(path, timestamp)
map.put(stripPathIfNecessary(path), timestamp)
if (timestamp > latestTimestamp) {
latestTimestamp = timestamp
}
Expand All @@ -295,7 +308,7 @@ object FileStreamSource {
def isNewFile(path: String, timestamp: Timestamp): Boolean = {
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
timestamp >= lastPurgeTimestamp && !map.containsKey(path)
timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path))
}

/** Removes aged entries and returns the number of files removed. */
Expand All @@ -314,9 +327,5 @@ object FileStreamSource {
}

def size: Int = map.size()

def allEntries: Seq[(String, Timestamp)] = {
map.asScala.toSeq
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}

test("SeenFilesMap") {
val map = new SeenFilesMap(maxAgeMs = 10)
val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)

map.add("a", 5)
assert(map.size == 1)
Expand Down Expand Up @@ -1269,8 +1269,26 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
assert(map.isNewFile("e", 20))
}

test("SeenFilesMap with fileNameOnly = true") {
val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true)

map.add("file:///a/b/c/d", 5)
map.add("file:///a/b/c/e", 5)
assert(map.size === 2)

assert(!map.isNewFile("d", 5))
assert(!map.isNewFile("file:///d", 5))
assert(!map.isNewFile("file:///x/d", 5))
assert(!map.isNewFile("file:///x/y/d", 5))

map.add("s3:///bucket/d", 5)
map.add("s3n:///bucket/d", 5)
map.add("s3a:///bucket/d", 5)
assert(map.size === 2)
}

test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
val map = new SeenFilesMap(maxAgeMs = 10)
val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)

map.add("a", 20)
assert(map.size == 1)
Expand Down