Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ spark = SparkSession. ...

# Read text from socket
socketDF = spark \
.readStream() \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
Expand All @@ -552,7 +552,7 @@ socketDF.printSchema()
# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
.readStream() \
.readStream \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
Expand Down Expand Up @@ -971,7 +971,7 @@ Here is the compatibility matrix.
<br/><br/>
Update mode uses watermark to drop old aggregation state.
<br/><br/>
Complete mode does drop not old aggregation state since by definition this mode
Complete mode does not drop old aggregation state since by definition this mode
preserves all data in the Result Table.
</td>
</tr>
Expand Down Expand Up @@ -1201,13 +1201,13 @@ noAggDF = deviceDataDf.select("device").where("signal > 10")

# Print new data to console
noAggDF \
.writeStream() \
.writeStream \
.format("console") \
.start()

# Write new data to Parquet files
noAggDF \
.writeStream() \
.writeStream \
.format("parquet") \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \
Expand All @@ -1218,14 +1218,14 @@ aggDF = df.groupBy("device").count()

# Print updated aggregations to console
aggDF \
.writeStream() \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()

# Have all the aggregates in an in memory table. The query name will be the table name
aggDF \
.writeStream() \
.writeStream \
.queryName("aggregates") \
.outputMode("complete") \
.format("memory") \
Expand Down Expand Up @@ -1313,7 +1313,7 @@ query.lastProgress(); // the most recent progress update of this streaming qu
<div data-lang="python" markdown="1">

{% highlight python %}
query = df.writeStream().format("console").start() # get the query object
query = df.writeStream.format("console").start() # get the query object

query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data

Expand Down Expand Up @@ -1658,7 +1658,7 @@ aggDF

{% highlight python %}
aggDF \
.writeStream() \
.writeStream \
.outputMode("complete") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.format("memory") \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class FileStreamSource(
}
seenFiles.purge()

logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = $maxFileAgeMs")
logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")

/**
* Returns the maximum offset that can be retrieved from the source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* - It must pass the user-provided file filter.
* - It must be newer than the ignore threshold. It is assumed that files older than the ignore
* threshold have already been considered or are existing files before start
* (when newFileOnly = true).
* (when newFilesOnly = true).
* - It must not be present in the recently selected files that this class remembers.
* - It must not be newer than the time of the batch (i.e. `currentTime` for which this
* file is being tested. This can occur if the driver was recovered, and the missing batches
Expand Down