diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 995ac77a4fb3b..798847237866b 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -539,7 +539,7 @@ spark = SparkSession. ...
# Read text from socket
socketDF = spark \
- .readStream() \
+ .readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
@@ -552,7 +552,7 @@ socketDF.printSchema()
# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
- .readStream() \
+ .readStream \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
@@ -971,7 +971,7 @@ Here is the compatibility matrix.
Update mode uses watermark to drop old aggregation state.
- Complete mode does drop not old aggregation state since by definition this mode
+ Complete mode does not drop old aggregation state since by definition this mode
preserves all data in the Result Table.
@@ -1201,13 +1201,13 @@ noAggDF = deviceDataDf.select("device").where("signal > 10")
# Print new data to console
noAggDF \
- .writeStream() \
+ .writeStream \
.format("console") \
.start()
# Write new data to Parquet files
noAggDF \
- .writeStream() \
+ .writeStream \
.format("parquet") \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \
@@ -1218,14 +1218,14 @@ aggDF = df.groupBy("device").count()
# Print updated aggregations to console
aggDF \
- .writeStream() \
+ .writeStream \
.outputMode("complete") \
.format("console") \
.start()
# Have all the aggregates in an in memory table. The query name will be the table name
aggDF \
- .writeStream() \
+ .writeStream \
.queryName("aggregates") \
.outputMode("complete") \
.format("memory") \
@@ -1313,7 +1313,7 @@ query.lastProgress(); // the most recent progress update of this streaming qu